Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ 3039e2dc

History | View | Annotate | Download (53.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
from cStringIO import StringIO
30
import os.path
31
import time
32
import OpenSSL
33
import itertools
34

    
35
from ganeti.cli import *
36
from ganeti import opcodes
37
from ganeti import constants
38
from ganeti import errors
39
from ganeti import utils
40
from ganeti import bootstrap
41
from ganeti import ssh
42
from ganeti import objects
43
from ganeti import uidpool
44
from ganeti import compat
45
from ganeti import netutils
46
from ganeti import pathutils
47

    
48

    
49
ON_OPT = cli_option("--on", default=False,
50
                    action="store_true", dest="on",
51
                    help="Recover from an EPO")
52

    
53
GROUPS_OPT = cli_option("--groups", default=False,
54
                        action="store_true", dest="groups",
55
                        help="Arguments are node groups instead of nodes")
56

    
57
FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
58
                            help="Override interactive check for --no-voting",
59
                            default=False, action="store_true")
60

    
61
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
62
_EPO_PING_TIMEOUT = 1 # 1 second
63
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
64

    
65

    
66
def _CheckNoLvmStorageOptDeprecated(opts):
67
  """Checks if the legacy option '--no-lvm-storage' is used.
68

69
  """
70
  if not opts.lvm_storage:
71
    ToStderr("The option --no-lvm-storage is no longer supported. If you want"
72
             " to disable lvm-based storage cluster-wide, use the option"
73
             " --enabled-disk-templates to disable all of these lvm-base disk "
74
             "  templates: %s" %
75
             utils.CommaJoin(utils.GetLvmDiskTemplates()))
76
    return 1
77

    
78

    
79
@UsesRPC
80
def InitCluster(opts, args):
81
  """Initialize the cluster.
82

83
  @param opts: the command line options selected by the user
84
  @type args: list
85
  @param args: should contain only one element, the desired
86
      cluster name
87
  @rtype: int
88
  @return: the desired exit code
89

90
  """
91
  if _CheckNoLvmStorageOptDeprecated(opts):
92
    return 1
93
  enabled_disk_templates = opts.enabled_disk_templates
94
  if enabled_disk_templates:
95
    enabled_disk_templates = enabled_disk_templates.split(",")
96
  else:
97
    enabled_disk_templates = constants.DEFAULT_ENABLED_DISK_TEMPLATES
98

    
99
  vg_name = None
100
  if opts.vg_name is not None:
101
    vg_name = opts.vg_name
102
    if vg_name:
103
      if not utils.IsLvmEnabled(enabled_disk_templates):
104
        ToStdout("You specified a volume group with --vg-name, but you did not"
105
                 " enable any disk template that uses lvm.")
106
    else:
107
      if utils.IsLvmEnabled(enabled_disk_templates):
108
        ToStderr("LVM disk templates are enabled, but vg name not set.")
109
        return 1
110
  else:
111
    if utils.IsLvmEnabled(enabled_disk_templates):
112
      vg_name = constants.DEFAULT_VG
113

    
114
  if not opts.drbd_storage and opts.drbd_helper:
115
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
116
    return 1
117

    
118
  drbd_helper = opts.drbd_helper
119
  if opts.drbd_storage and not opts.drbd_helper:
120
    drbd_helper = constants.DEFAULT_DRBD_HELPER
121

    
122
  master_netdev = opts.master_netdev
123
  if master_netdev is None:
124
    master_netdev = constants.DEFAULT_BRIDGE
125

    
126
  hvlist = opts.enabled_hypervisors
127
  if hvlist is None:
128
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
129
  hvlist = hvlist.split(",")
130

    
131
  hvparams = dict(opts.hvparams)
132
  beparams = opts.beparams
133
  nicparams = opts.nicparams
134

    
135
  diskparams = dict(opts.diskparams)
136

    
137
  # check the disk template types here, as we cannot rely on the type check done
138
  # by the opcode parameter types
139
  diskparams_keys = set(diskparams.keys())
140
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
141
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
142
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
143
    return 1
144

    
145
  # prepare beparams dict
146
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
147
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
148

    
149
  # prepare nicparams dict
150
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
151
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
152

    
153
  # prepare ndparams dict
154
  if opts.ndparams is None:
155
    ndparams = dict(constants.NDC_DEFAULTS)
156
  else:
157
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
158
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
159

    
160
  # prepare hvparams dict
161
  for hv in constants.HYPER_TYPES:
162
    if hv not in hvparams:
163
      hvparams[hv] = {}
164
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
165
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
166

    
167
  # prepare diskparams dict
168
  for templ in constants.DISK_TEMPLATES:
169
    if templ not in diskparams:
170
      diskparams[templ] = {}
171
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
172
                                         diskparams[templ])
173
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
174

    
175
  # prepare ipolicy dict
176
  ipolicy = CreateIPolicyFromOpts(
177
    ispecs_mem_size=opts.ispecs_mem_size,
178
    ispecs_cpu_count=opts.ispecs_cpu_count,
179
    ispecs_disk_count=opts.ispecs_disk_count,
180
    ispecs_disk_size=opts.ispecs_disk_size,
181
    ispecs_nic_count=opts.ispecs_nic_count,
182
    minmax_ispecs=opts.ipolicy_bounds_specs,
183
    std_ispecs=opts.ipolicy_std_specs,
184
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
185
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
186
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
187
    fill_all=True)
188

    
189
  if opts.candidate_pool_size is None:
190
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
191

    
192
  if opts.mac_prefix is None:
193
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
194

    
195
  uid_pool = opts.uid_pool
196
  if uid_pool is not None:
197
    uid_pool = uidpool.ParseUidPool(uid_pool)
198

    
199
  if opts.prealloc_wipe_disks is None:
200
    opts.prealloc_wipe_disks = False
201

    
202
  external_ip_setup_script = opts.use_external_mip_script
203
  if external_ip_setup_script is None:
204
    external_ip_setup_script = False
205

    
206
  try:
207
    primary_ip_version = int(opts.primary_ip_version)
208
  except (ValueError, TypeError), err:
209
    ToStderr("Invalid primary ip version value: %s" % str(err))
210
    return 1
211

    
212
  master_netmask = opts.master_netmask
213
  try:
214
    if master_netmask is not None:
215
      master_netmask = int(master_netmask)
216
  except (ValueError, TypeError), err:
217
    ToStderr("Invalid master netmask value: %s" % str(err))
218
    return 1
219

    
220
  if opts.disk_state:
221
    disk_state = utils.FlatToDict(opts.disk_state)
222
  else:
223
    disk_state = {}
224

    
225
  hv_state = dict(opts.hv_state)
226

    
227
  bootstrap.InitCluster(cluster_name=args[0],
228
                        secondary_ip=opts.secondary_ip,
229
                        vg_name=vg_name,
230
                        mac_prefix=opts.mac_prefix,
231
                        master_netmask=master_netmask,
232
                        master_netdev=master_netdev,
233
                        file_storage_dir=opts.file_storage_dir,
234
                        shared_file_storage_dir=opts.shared_file_storage_dir,
235
                        enabled_hypervisors=hvlist,
236
                        hvparams=hvparams,
237
                        beparams=beparams,
238
                        nicparams=nicparams,
239
                        ndparams=ndparams,
240
                        diskparams=diskparams,
241
                        ipolicy=ipolicy,
242
                        candidate_pool_size=opts.candidate_pool_size,
243
                        modify_etc_hosts=opts.modify_etc_hosts,
244
                        modify_ssh_setup=opts.modify_ssh_setup,
245
                        maintain_node_health=opts.maintain_node_health,
246
                        drbd_helper=drbd_helper,
247
                        uid_pool=uid_pool,
248
                        default_iallocator=opts.default_iallocator,
249
                        primary_ip_version=primary_ip_version,
250
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
251
                        use_external_mip_script=external_ip_setup_script,
252
                        hv_state=hv_state,
253
                        disk_state=disk_state,
254
                        enabled_disk_templates=enabled_disk_templates,
255
                        )
256
  op = opcodes.OpClusterPostInit()
257
  SubmitOpCode(op, opts=opts)
258
  return 0
259

    
260

    
261
@UsesRPC
262
def DestroyCluster(opts, args):
263
  """Destroy the cluster.
264

265
  @param opts: the command line options selected by the user
266
  @type args: list
267
  @param args: should be an empty list
268
  @rtype: int
269
  @return: the desired exit code
270

271
  """
272
  if not opts.yes_do_it:
273
    ToStderr("Destroying a cluster is irreversible. If you really want"
274
             " destroy this cluster, supply the --yes-do-it option.")
275
    return 1
276

    
277
  op = opcodes.OpClusterDestroy()
278
  master_uuid = SubmitOpCode(op, opts=opts)
279
  # if we reached this, the opcode didn't fail; we can proceed to
280
  # shutdown all the daemons
281
  bootstrap.FinalizeClusterDestroy(master_uuid)
282
  return 0
283

    
284

    
285
def RenameCluster(opts, args):
286
  """Rename the cluster.
287

288
  @param opts: the command line options selected by the user
289
  @type args: list
290
  @param args: should contain only one element, the new cluster name
291
  @rtype: int
292
  @return: the desired exit code
293

294
  """
295
  cl = GetClient()
296

    
297
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
298

    
299
  new_name = args[0]
300
  if not opts.force:
301
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
302
                " connected over the network to the cluster name, the"
303
                " operation is very dangerous as the IP address will be"
304
                " removed from the node and the change may not go through."
305
                " Continue?") % (cluster_name, new_name)
306
    if not AskUser(usertext):
307
      return 1
308

    
309
  op = opcodes.OpClusterRename(name=new_name)
310
  result = SubmitOpCode(op, opts=opts, cl=cl)
311

    
312
  if result:
313
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
314

    
315
  return 0
316

    
317

    
318
def ActivateMasterIp(opts, args):
319
  """Activates the master IP.
320

321
  """
322
  op = opcodes.OpClusterActivateMasterIp()
323
  SubmitOpCode(op)
324
  return 0
325

    
326

    
327
def DeactivateMasterIp(opts, args):
328
  """Deactivates the master IP.
329

330
  """
331
  if not opts.confirm:
332
    usertext = ("This will disable the master IP. All the open connections to"
333
                " the master IP will be closed. To reach the master you will"
334
                " need to use its node IP."
335
                " Continue?")
336
    if not AskUser(usertext):
337
      return 1
338

    
339
  op = opcodes.OpClusterDeactivateMasterIp()
340
  SubmitOpCode(op)
341
  return 0
342

    
343

    
344
def RedistributeConfig(opts, args):
345
  """Forces push of the cluster configuration.
346

347
  @param opts: the command line options selected by the user
348
  @type args: list
349
  @param args: empty list
350
  @rtype: int
351
  @return: the desired exit code
352

353
  """
354
  op = opcodes.OpClusterRedistConf()
355
  SubmitOrSend(op, opts)
356
  return 0
357

    
358

    
359
def ShowClusterVersion(opts, args):
360
  """Write version of ganeti software to the standard output.
361

362
  @param opts: the command line options selected by the user
363
  @type args: list
364
  @param args: should be an empty list
365
  @rtype: int
366
  @return: the desired exit code
367

368
  """
369
  cl = GetClient(query=True)
370
  result = cl.QueryClusterInfo()
371
  ToStdout("Software version: %s", result["software_version"])
372
  ToStdout("Internode protocol: %s", result["protocol_version"])
373
  ToStdout("Configuration format: %s", result["config_version"])
374
  ToStdout("OS api version: %s", result["os_api_version"])
375
  ToStdout("Export interface: %s", result["export_version"])
376
  return 0
377

    
378

    
379
def ShowClusterMaster(opts, args):
380
  """Write name of master node to the standard output.
381

382
  @param opts: the command line options selected by the user
383
  @type args: list
384
  @param args: should be an empty list
385
  @rtype: int
386
  @return: the desired exit code
387

388
  """
389
  master = bootstrap.GetMaster()
390
  ToStdout(master)
391
  return 0
392

    
393

    
394
def _FormatGroupedParams(paramsdict, roman=False):
395
  """Format Grouped parameters (be, nic, disk) by group.
396

397
  @type paramsdict: dict of dicts
398
  @param paramsdict: {group: {param: value, ...}, ...}
399
  @rtype: dict of dicts
400
  @return: copy of the input dictionaries with strings as values
401

402
  """
403
  ret = {}
404
  for (item, val) in paramsdict.items():
405
    if isinstance(val, dict):
406
      ret[item] = _FormatGroupedParams(val, roman=roman)
407
    elif roman and isinstance(val, int):
408
      ret[item] = compat.TryToRoman(val)
409
    else:
410
      ret[item] = str(val)
411
  return ret
412

    
413

    
414
def ShowClusterConfig(opts, args):
415
  """Shows cluster information.
416

417
  @param opts: the command line options selected by the user
418
  @type args: list
419
  @param args: should be an empty list
420
  @rtype: int
421
  @return: the desired exit code
422

423
  """
424
  cl = GetClient(query=True)
425
  result = cl.QueryClusterInfo()
426

    
427
  if result["tags"]:
428
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
429
  else:
430
    tags = "(none)"
431
  if result["reserved_lvs"]:
432
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
433
  else:
434
    reserved_lvs = "(none)"
435

    
436
  enabled_hv = result["enabled_hypervisors"]
437
  hvparams = dict((k, v) for k, v in result["hvparams"].iteritems()
438
                  if k in enabled_hv)
439

    
440
  info = [
441
    ("Cluster name", result["name"]),
442
    ("Cluster UUID", result["uuid"]),
443

    
444
    ("Creation time", utils.FormatTime(result["ctime"])),
445
    ("Modification time", utils.FormatTime(result["mtime"])),
446

    
447
    ("Master node", result["master"]),
448

    
449
    ("Architecture (this node)",
450
     "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
451

    
452
    ("Tags", tags),
453

    
454
    ("Default hypervisor", result["default_hypervisor"]),
455
    ("Enabled hypervisors", utils.CommaJoin(enabled_hv)),
456

    
457
    ("Hypervisor parameters", _FormatGroupedParams(hvparams)),
458

    
459
    ("OS-specific hypervisor parameters",
460
     _FormatGroupedParams(result["os_hvp"])),
461

    
462
    ("OS parameters", _FormatGroupedParams(result["osparams"])),
463

    
464
    ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
465
    ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
466

    
467
    ("Cluster parameters", [
468
      ("candidate pool size",
469
       compat.TryToRoman(result["candidate_pool_size"],
470
                         convert=opts.roman_integers)),
471
      ("master netdev", result["master_netdev"]),
472
      ("master netmask", result["master_netmask"]),
473
      ("use external master IP address setup script",
474
       result["use_external_mip_script"]),
475
      ("lvm volume group", result["volume_group_name"]),
476
      ("lvm reserved volumes", reserved_lvs),
477
      ("drbd usermode helper", result["drbd_usermode_helper"]),
478
      ("file storage path", result["file_storage_dir"]),
479
      ("shared file storage path", result["shared_file_storage_dir"]),
480
      ("maintenance of node health", result["maintain_node_health"]),
481
      ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
482
      ("default instance allocator", result["default_iallocator"]),
483
      ("primary ip version", result["primary_ip_version"]),
484
      ("preallocation wipe disks", result["prealloc_wipe_disks"]),
485
      ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
486
      ("ExtStorage Providers search path",
487
       utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
488
      ("enabled disk templates",
489
       utils.CommaJoin(result["enabled_disk_templates"])),
490
      ]),
491

    
492
    ("Default node parameters",
493
     _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
494

    
495
    ("Default instance parameters",
496
     _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
497

    
498
    ("Default nic parameters",
499
     _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
500

    
501
    ("Default disk parameters",
502
     _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
503

    
504
    ("Instance policy - limits for instances",
505
     FormatPolicyInfo(result["ipolicy"], None, True)),
506
    ]
507

    
508
  PrintGenericInfo(info)
509
  return 0
510

    
511

    
512
def ClusterCopyFile(opts, args):
513
  """Copy a file from master to some nodes.
514

515
  @param opts: the command line options selected by the user
516
  @type args: list
517
  @param args: should contain only one element, the path of
518
      the file to be copied
519
  @rtype: int
520
  @return: the desired exit code
521

522
  """
523
  filename = args[0]
524
  if not os.path.exists(filename):
525
    raise errors.OpPrereqError("No such filename '%s'" % filename,
526
                               errors.ECODE_INVAL)
527

    
528
  cl = GetClient()
529

    
530
  cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
531

    
532
  results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
533
                           secondary_ips=opts.use_replication_network,
534
                           nodegroup=opts.nodegroup)
535

    
536
  srun = ssh.SshRunner(cluster_name)
537
  for node in results:
538
    if not srun.CopyFileToNode(node, filename):
539
      ToStderr("Copy of file %s to node %s failed", filename, node)
540

    
541
  return 0
542

    
543

    
544
def RunClusterCommand(opts, args):
545
  """Run a command on some nodes.
546

547
  @param opts: the command line options selected by the user
548
  @type args: list
549
  @param args: should contain the command to be run and its arguments
550
  @rtype: int
551
  @return: the desired exit code
552

553
  """
554
  cl = GetClient()
555

    
556
  command = " ".join(args)
557

    
558
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
559

    
560
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
561
                                                    "master_node"])
562

    
563
  srun = ssh.SshRunner(cluster_name=cluster_name)
564

    
565
  # Make sure master node is at list end
566
  if master_node in nodes:
567
    nodes.remove(master_node)
568
    nodes.append(master_node)
569

    
570
  for name in nodes:
571
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
572

    
573
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
574
      # Do not output anything for successful commands
575
      continue
576

    
577
    ToStdout("------------------------------------------------")
578
    if opts.show_machine_names:
579
      for line in result.output.splitlines():
580
        ToStdout("%s: %s", name, line)
581
    else:
582
      ToStdout("node: %s", name)
583
      ToStdout("%s", result.output)
584
    ToStdout("return code = %s", result.exit_code)
585

    
586
  return 0
587

    
588

    
589
def VerifyCluster(opts, args):
590
  """Verify integrity of cluster, performing various test on nodes.
591

592
  @param opts: the command line options selected by the user
593
  @type args: list
594
  @param args: should be an empty list
595
  @rtype: int
596
  @return: the desired exit code
597

598
  """
599
  skip_checks = []
600

    
601
  if opts.skip_nplusone_mem:
602
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
603

    
604
  cl = GetClient()
605

    
606
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
607
                               error_codes=opts.error_codes,
608
                               debug_simulate_errors=opts.simulate_errors,
609
                               skip_checks=skip_checks,
610
                               ignore_errors=opts.ignore_errors,
611
                               group_name=opts.nodegroup)
612
  result = SubmitOpCode(op, cl=cl, opts=opts)
613

    
614
  # Keep track of submitted jobs
615
  jex = JobExecutor(cl=cl, opts=opts)
616

    
617
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
618
    jex.AddJobId(None, status, job_id)
619

    
620
  results = jex.GetResults()
621

    
622
  (bad_jobs, bad_results) = \
623
    map(len,
624
        # Convert iterators to lists
625
        map(list,
626
            # Count errors
627
            map(compat.partial(itertools.ifilterfalse, bool),
628
                # Convert result to booleans in a tuple
629
                zip(*((job_success, len(op_results) == 1 and op_results[0])
630
                      for (job_success, op_results) in results)))))
631

    
632
  if bad_jobs == 0 and bad_results == 0:
633
    rcode = constants.EXIT_SUCCESS
634
  else:
635
    rcode = constants.EXIT_FAILURE
636
    if bad_jobs > 0:
637
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
638

    
639
  return rcode
640

    
641

    
642
def VerifyDisks(opts, args):
643
  """Verify integrity of cluster disks.
644

645
  @param opts: the command line options selected by the user
646
  @type args: list
647
  @param args: should be an empty list
648
  @rtype: int
649
  @return: the desired exit code
650

651
  """
652
  cl = GetClient()
653

    
654
  op = opcodes.OpClusterVerifyDisks()
655

    
656
  result = SubmitOpCode(op, cl=cl, opts=opts)
657

    
658
  # Keep track of submitted jobs
659
  jex = JobExecutor(cl=cl, opts=opts)
660

    
661
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
662
    jex.AddJobId(None, status, job_id)
663

    
664
  retcode = constants.EXIT_SUCCESS
665

    
666
  for (status, result) in jex.GetResults():
667
    if not status:
668
      ToStdout("Job failed: %s", result)
669
      continue
670

    
671
    ((bad_nodes, instances, missing), ) = result
672

    
673
    for node, text in bad_nodes.items():
674
      ToStdout("Error gathering data on node %s: %s",
675
               node, utils.SafeEncode(text[-400:]))
676
      retcode = constants.EXIT_FAILURE
677
      ToStdout("You need to fix these nodes first before fixing instances")
678

    
679
    for iname in instances:
680
      if iname in missing:
681
        continue
682
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
683
      try:
684
        ToStdout("Activating disks for instance '%s'", iname)
685
        SubmitOpCode(op, opts=opts, cl=cl)
686
      except errors.GenericError, err:
687
        nret, msg = FormatError(err)
688
        retcode |= nret
689
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
690

    
691
    if missing:
692
      for iname, ival in missing.iteritems():
693
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
694
        if all_missing:
695
          ToStdout("Instance %s cannot be verified as it lives on"
696
                   " broken nodes", iname)
697
        else:
698
          ToStdout("Instance %s has missing logical volumes:", iname)
699
          ival.sort()
700
          for node, vol in ival:
701
            if node in bad_nodes:
702
              ToStdout("\tbroken node %s /dev/%s", node, vol)
703
            else:
704
              ToStdout("\t%s /dev/%s", node, vol)
705

    
706
      ToStdout("You need to replace or recreate disks for all the above"
707
               " instances if this message persists after fixing broken nodes.")
708
      retcode = constants.EXIT_FAILURE
709
    elif not instances:
710
      ToStdout("No disks need to be activated.")
711

    
712
  return retcode
713

    
714

    
715
def RepairDiskSizes(opts, args):
716
  """Verify sizes of cluster disks.
717

718
  @param opts: the command line options selected by the user
719
  @type args: list
720
  @param args: optional list of instances to restrict check to
721
  @rtype: int
722
  @return: the desired exit code
723

724
  """
725
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
726
  SubmitOpCode(op, opts=opts)
727

    
728

    
729
@UsesRPC
730
def MasterFailover(opts, args):
731
  """Failover the master node.
732

733
  This command, when run on a non-master node, will cause the current
734
  master to cease being master, and the non-master to become new
735
  master.
736

737
  @param opts: the command line options selected by the user
738
  @type args: list
739
  @param args: should be an empty list
740
  @rtype: int
741
  @return: the desired exit code
742

743
  """
744
  if opts.no_voting and not opts.yes_do_it:
745
    usertext = ("This will perform the failover even if most other nodes"
746
                " are down, or if this node is outdated. This is dangerous"
747
                " as it can lead to a non-consistent cluster. Check the"
748
                " gnt-cluster(8) man page before proceeding. Continue?")
749
    if not AskUser(usertext):
750
      return 1
751

    
752
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
753

    
754

    
755
def MasterPing(opts, args):
756
  """Checks if the master is alive.
757

758
  @param opts: the command line options selected by the user
759
  @type args: list
760
  @param args: should be an empty list
761
  @rtype: int
762
  @return: the desired exit code
763

764
  """
765
  try:
766
    cl = GetClient()
767
    cl.QueryClusterInfo()
768
    return 0
769
  except Exception: # pylint: disable=W0703
770
    return 1
771

    
772

    
773
def SearchTags(opts, args):
774
  """Searches the tags on all the cluster.
775

776
  @param opts: the command line options selected by the user
777
  @type args: list
778
  @param args: should contain only one element, the tag pattern
779
  @rtype: int
780
  @return: the desired exit code
781

782
  """
783
  op = opcodes.OpTagsSearch(pattern=args[0])
784
  result = SubmitOpCode(op, opts=opts)
785
  if not result:
786
    return 1
787
  result = list(result)
788
  result.sort()
789
  for path, tag in result:
790
    ToStdout("%s %s", path, tag)
791

    
792

    
793
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
794
  """Reads and verifies an X509 certificate.
795

796
  @type cert_filename: string
797
  @param cert_filename: the path of the file containing the certificate to
798
                        verify encoded in PEM format
799
  @type verify_private_key: bool
800
  @param verify_private_key: whether to verify the private key in addition to
801
                             the public certificate
802
  @rtype: string
803
  @return: a string containing the PEM-encoded certificate.
804

805
  """
806
  try:
807
    pem = utils.ReadFile(cert_filename)
808
  except IOError, err:
809
    raise errors.X509CertError(cert_filename,
810
                               "Unable to read certificate: %s" % str(err))
811

    
812
  try:
813
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
814
  except Exception, err:
815
    raise errors.X509CertError(cert_filename,
816
                               "Unable to load certificate: %s" % str(err))
817

    
818
  if verify_private_key:
819
    try:
820
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
821
    except Exception, err:
822
      raise errors.X509CertError(cert_filename,
823
                                 "Unable to load private key: %s" % str(err))
824

    
825
  return pem
826

    
827

    
828
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
829
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
830
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
831
                 cds_filename, force):
832
  """Renews cluster certificates, keys and secrets.
833

834
  @type new_cluster_cert: bool
835
  @param new_cluster_cert: Whether to generate a new cluster certificate
836
  @type new_rapi_cert: bool
837
  @param new_rapi_cert: Whether to generate a new RAPI certificate
838
  @type rapi_cert_filename: string
839
  @param rapi_cert_filename: Path to file containing new RAPI certificate
840
  @type new_spice_cert: bool
841
  @param new_spice_cert: Whether to generate a new SPICE certificate
842
  @type spice_cert_filename: string
843
  @param spice_cert_filename: Path to file containing new SPICE certificate
844
  @type spice_cacert_filename: string
845
  @param spice_cacert_filename: Path to file containing the certificate of the
846
                                CA that signed the SPICE certificate
847
  @type new_confd_hmac_key: bool
848
  @param new_confd_hmac_key: Whether to generate a new HMAC key
849
  @type new_cds: bool
850
  @param new_cds: Whether to generate a new cluster domain secret
851
  @type cds_filename: string
852
  @param cds_filename: Path to file containing new cluster domain secret
853
  @type force: bool
854
  @param force: Whether to ask user for confirmation
855

856
  """
857
  if new_rapi_cert and rapi_cert_filename:
858
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
859
             " options can be specified at the same time.")
860
    return 1
861

    
862
  if new_cds and cds_filename:
863
    ToStderr("Only one of the --new-cluster-domain-secret and"
864
             " --cluster-domain-secret options can be specified at"
865
             " the same time.")
866
    return 1
867

    
868
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
869
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
870
             " and --spice-ca-certificate must not be used.")
871
    return 1
872

    
873
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
874
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
875
             " specified.")
876
    return 1
877

    
878
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
879
  try:
880
    if rapi_cert_filename:
881
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
882
    if spice_cert_filename:
883
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
884
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
885
  except errors.X509CertError, err:
886
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
887
    return 1
888

    
889
  if cds_filename:
890
    try:
891
      cds = utils.ReadFile(cds_filename)
892
    except Exception, err: # pylint: disable=W0703
893
      ToStderr("Can't load new cluster domain secret from %s: %s" %
894
               (cds_filename, str(err)))
895
      return 1
896
  else:
897
    cds = None
898

    
899
  if not force:
900
    usertext = ("This requires all daemons on all nodes to be restarted and"
901
                " may take some time. Continue?")
902
    if not AskUser(usertext):
903
      return 1
904

    
905
  def _RenewCryptoInner(ctx):
906
    ctx.feedback_fn("Updating certificates and keys")
907
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
908
                                    new_rapi_cert,
909
                                    new_spice_cert,
910
                                    new_confd_hmac_key,
911
                                    new_cds,
912
                                    rapi_cert_pem=rapi_cert_pem,
913
                                    spice_cert_pem=spice_cert_pem,
914
                                    spice_cacert_pem=spice_cacert_pem,
915
                                    cds=cds)
916

    
917
    files_to_copy = []
918

    
919
    if new_cluster_cert:
920
      files_to_copy.append(pathutils.NODED_CERT_FILE)
921

    
922
    if new_rapi_cert or rapi_cert_pem:
923
      files_to_copy.append(pathutils.RAPI_CERT_FILE)
924

    
925
    if new_spice_cert or spice_cert_pem:
926
      files_to_copy.append(pathutils.SPICE_CERT_FILE)
927
      files_to_copy.append(pathutils.SPICE_CACERT_FILE)
928

    
929
    if new_confd_hmac_key:
930
      files_to_copy.append(pathutils.CONFD_HMAC_KEY)
931

    
932
    if new_cds or cds:
933
      files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
934

    
935
    if files_to_copy:
936
      for node_name in ctx.nonmaster_nodes:
937
        ctx.feedback_fn("Copying %s to %s" %
938
                        (", ".join(files_to_copy), node_name))
939
        for file_name in files_to_copy:
940
          ctx.ssh.CopyFileToNode(node_name, file_name)
941

    
942
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
943

    
944
  ToStdout("All requested certificates and keys have been replaced."
945
           " Running \"gnt-cluster verify\" now is recommended.")
946

    
947
  return 0
948

    
949

    
950
def RenewCrypto(opts, args):
951
  """Renews cluster certificates, keys and secrets.
952

953
  """
954
  return _RenewCrypto(opts.new_cluster_cert,
955
                      opts.new_rapi_cert,
956
                      opts.rapi_cert,
957
                      opts.new_spice_cert,
958
                      opts.spice_cert,
959
                      opts.spice_cacert,
960
                      opts.new_confd_hmac_key,
961
                      opts.new_cluster_domain_secret,
962
                      opts.cluster_domain_secret,
963
                      opts.force)
964

    
965

    
966
def SetClusterParams(opts, args):
967
  """Modify the cluster.
968

969
  @param opts: the command line options selected by the user
970
  @type args: list
971
  @param args: should be an empty list
972
  @rtype: int
973
  @return: the desired exit code
974

975
  """
976
  if not (opts.vg_name is not None or opts.drbd_helper or
977
          opts.enabled_hypervisors or opts.hvparams or
978
          opts.beparams or opts.nicparams or
979
          opts.ndparams or opts.diskparams or
980
          opts.candidate_pool_size is not None or
981
          opts.uid_pool is not None or
982
          opts.maintain_node_health is not None or
983
          opts.add_uids is not None or
984
          opts.remove_uids is not None or
985
          opts.default_iallocator is not None or
986
          opts.reserved_lvs is not None or
987
          opts.master_netdev is not None or
988
          opts.master_netmask is not None or
989
          opts.use_external_mip_script is not None or
990
          opts.prealloc_wipe_disks is not None or
991
          opts.hv_state or
992
          opts.enabled_disk_templates or
993
          opts.disk_state or
994
          opts.ipolicy_bounds_specs is not None or
995
          opts.ipolicy_std_specs is not None or
996
          opts.ipolicy_disk_templates is not None or
997
          opts.ipolicy_vcpu_ratio is not None or
998
          opts.ipolicy_spindle_ratio is not None or
999
          opts.modify_etc_hosts is not None or
1000
          opts.file_storage_dir is not None):
1001
    ToStderr("Please give at least one of the parameters.")
1002
    return 1
1003

    
1004
  if _CheckNoLvmStorageOptDeprecated(opts):
1005
    return 1
1006

    
1007
  enabled_disk_templates = None
1008
  if opts.enabled_disk_templates:
1009
    enabled_disk_templates = opts.enabled_disk_templates.split(",")
1010

    
1011
  # consistency between vg name and enabled disk templates
1012
  vg_name = None
1013
  if opts.vg_name is not None:
1014
    vg_name = opts.vg_name
1015
  if enabled_disk_templates:
1016
    if vg_name and not utils.IsLvmEnabled(enabled_disk_templates):
1017
      ToStdout("You specified a volume group with --vg-name, but you did not"
1018
               " enable any of the following lvm-based disk templates: %s" %
1019
               utils.CommaJoin(utils.GetLvmDiskTemplates()))
1020

    
1021
  drbd_helper = opts.drbd_helper
1022
  if not opts.drbd_storage and opts.drbd_helper:
1023
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
1024
    return 1
1025

    
1026
  if not opts.drbd_storage:
1027
    drbd_helper = ""
1028

    
1029
  hvlist = opts.enabled_hypervisors
1030
  if hvlist is not None:
1031
    hvlist = hvlist.split(",")
1032

    
1033
  # a list of (name, dict) we can pass directly to dict() (or [])
1034
  hvparams = dict(opts.hvparams)
1035
  for hv_params in hvparams.values():
1036
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1037

    
1038
  diskparams = dict(opts.diskparams)
1039

    
1040
  for dt_params in diskparams.values():
1041
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1042

    
1043
  beparams = opts.beparams
1044
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1045

    
1046
  nicparams = opts.nicparams
1047
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1048

    
1049
  ndparams = opts.ndparams
1050
  if ndparams is not None:
1051
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1052

    
1053
  ipolicy = CreateIPolicyFromOpts(
1054
    minmax_ispecs=opts.ipolicy_bounds_specs,
1055
    std_ispecs=opts.ipolicy_std_specs,
1056
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1057
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1058
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1059
    )
1060

    
1061
  mnh = opts.maintain_node_health
1062

    
1063
  uid_pool = opts.uid_pool
1064
  if uid_pool is not None:
1065
    uid_pool = uidpool.ParseUidPool(uid_pool)
1066

    
1067
  add_uids = opts.add_uids
1068
  if add_uids is not None:
1069
    add_uids = uidpool.ParseUidPool(add_uids)
1070

    
1071
  remove_uids = opts.remove_uids
1072
  if remove_uids is not None:
1073
    remove_uids = uidpool.ParseUidPool(remove_uids)
1074

    
1075
  if opts.reserved_lvs is not None:
1076
    if opts.reserved_lvs == "":
1077
      opts.reserved_lvs = []
1078
    else:
1079
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1080

    
1081
  if opts.master_netmask is not None:
1082
    try:
1083
      opts.master_netmask = int(opts.master_netmask)
1084
    except ValueError:
1085
      ToStderr("The --master-netmask option expects an int parameter.")
1086
      return 1
1087

    
1088
  ext_ip_script = opts.use_external_mip_script
1089

    
1090
  if opts.disk_state:
1091
    disk_state = utils.FlatToDict(opts.disk_state)
1092
  else:
1093
    disk_state = {}
1094

    
1095
  hv_state = dict(opts.hv_state)
1096

    
1097
  op = opcodes.OpClusterSetParams(
1098
    vg_name=vg_name,
1099
    drbd_helper=drbd_helper,
1100
    enabled_hypervisors=hvlist,
1101
    hvparams=hvparams,
1102
    os_hvp=None,
1103
    beparams=beparams,
1104
    nicparams=nicparams,
1105
    ndparams=ndparams,
1106
    diskparams=diskparams,
1107
    ipolicy=ipolicy,
1108
    candidate_pool_size=opts.candidate_pool_size,
1109
    maintain_node_health=mnh,
1110
    modify_etc_hosts=opts.modify_etc_hosts,
1111
    uid_pool=uid_pool,
1112
    add_uids=add_uids,
1113
    remove_uids=remove_uids,
1114
    default_iallocator=opts.default_iallocator,
1115
    prealloc_wipe_disks=opts.prealloc_wipe_disks,
1116
    master_netdev=opts.master_netdev,
1117
    master_netmask=opts.master_netmask,
1118
    reserved_lvs=opts.reserved_lvs,
1119
    use_external_mip_script=ext_ip_script,
1120
    hv_state=hv_state,
1121
    disk_state=disk_state,
1122
    enabled_disk_templates=enabled_disk_templates,
1123
    force=opts.force,
1124
    file_storage_dir=opts.file_storage_dir,
1125
    )
1126
  SubmitOrSend(op, opts)
1127
  return 0
1128

    
1129

    
1130
def QueueOps(opts, args):
1131
  """Queue operations.
1132

1133
  @param opts: the command line options selected by the user
1134
  @type args: list
1135
  @param args: should contain only one element, the subcommand
1136
  @rtype: int
1137
  @return: the desired exit code
1138

1139
  """
1140
  command = args[0]
1141
  client = GetClient()
1142
  if command in ("drain", "undrain"):
1143
    drain_flag = command == "drain"
1144
    client.SetQueueDrainFlag(drain_flag)
1145
  elif command == "info":
1146
    result = client.QueryConfigValues(["drain_flag"])
1147
    if result[0]:
1148
      val = "set"
1149
    else:
1150
      val = "unset"
1151
    ToStdout("The drain flag is %s" % val)
1152
  else:
1153
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1154
                               errors.ECODE_INVAL)
1155

    
1156
  return 0
1157

    
1158

    
1159
def _ShowWatcherPause(until):
1160
  if until is None or until < time.time():
1161
    ToStdout("The watcher is not paused.")
1162
  else:
1163
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1164

    
1165

    
1166
def WatcherOps(opts, args):
1167
  """Watcher operations.
1168

1169
  @param opts: the command line options selected by the user
1170
  @type args: list
1171
  @param args: should contain only one element, the subcommand
1172
  @rtype: int
1173
  @return: the desired exit code
1174

1175
  """
1176
  command = args[0]
1177
  client = GetClient()
1178

    
1179
  if command == "continue":
1180
    client.SetWatcherPause(None)
1181
    ToStdout("The watcher is no longer paused.")
1182

    
1183
  elif command == "pause":
1184
    if len(args) < 2:
1185
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1186

    
1187
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1188
    _ShowWatcherPause(result)
1189

    
1190
  elif command == "info":
1191
    result = client.QueryConfigValues(["watcher_pause"])
1192
    _ShowWatcherPause(result[0])
1193

    
1194
  else:
1195
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1196
                               errors.ECODE_INVAL)
1197

    
1198
  return 0
1199

    
1200

    
1201
def _OobPower(opts, node_list, power):
1202
  """Puts the node in the list to desired power state.
1203

1204
  @param opts: The command line options selected by the user
1205
  @param node_list: The list of nodes to operate on
1206
  @param power: True if they should be powered on, False otherwise
1207
  @return: The success of the operation (none failed)
1208

1209
  """
1210
  if power:
1211
    command = constants.OOB_POWER_ON
1212
  else:
1213
    command = constants.OOB_POWER_OFF
1214

    
1215
  op = opcodes.OpOobCommand(node_names=node_list,
1216
                            command=command,
1217
                            ignore_status=True,
1218
                            timeout=opts.oob_timeout,
1219
                            power_delay=opts.power_delay)
1220
  result = SubmitOpCode(op, opts=opts)
1221
  errs = 0
1222
  for node_result in result:
1223
    (node_tuple, data_tuple) = node_result
1224
    (_, node_name) = node_tuple
1225
    (data_status, _) = data_tuple
1226
    if data_status != constants.RS_NORMAL:
1227
      assert data_status != constants.RS_UNAVAIL
1228
      errs += 1
1229
      ToStderr("There was a problem changing power for %s, please investigate",
1230
               node_name)
1231

    
1232
  if errs > 0:
1233
    return False
1234

    
1235
  return True
1236

    
1237

    
1238
def _InstanceStart(opts, inst_list, start, no_remember=False):
1239
  """Puts the instances in the list to desired state.
1240

1241
  @param opts: The command line options selected by the user
1242
  @param inst_list: The list of instances to operate on
1243
  @param start: True if they should be started, False for shutdown
1244
  @param no_remember: If the instance state should be remembered
1245
  @return: The success of the operation (none failed)
1246

1247
  """
1248
  if start:
1249
    opcls = opcodes.OpInstanceStartup
1250
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1251
  else:
1252
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1253
                           timeout=opts.shutdown_timeout,
1254
                           no_remember=no_remember)
1255
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1256

    
1257
  jex = JobExecutor(opts=opts)
1258

    
1259
  for inst in inst_list:
1260
    ToStdout("Submit %s of instance %s", text_submit, inst)
1261
    op = opcls(instance_name=inst)
1262
    jex.QueueJob(inst, op)
1263

    
1264
  results = jex.GetResults()
1265
  bad_cnt = len([1 for (success, _) in results if not success])
1266

    
1267
  if bad_cnt == 0:
1268
    ToStdout("All instances have been %s successfully", text_success)
1269
  else:
1270
    ToStderr("There were errors while %s instances:\n"
1271
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1272
             len(results))
1273
    return False
1274

    
1275
  return True
1276

    
1277

    
1278
class _RunWhenNodesReachableHelper:
1279
  """Helper class to make shared internal state sharing easier.
1280

1281
  @ivar success: Indicates if all action_cb calls were successful
1282

1283
  """
1284
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1285
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1286
    """Init the object.
1287

1288
    @param node_list: The list of nodes to be reachable
1289
    @param action_cb: Callback called when a new host is reachable
1290
    @type node2ip: dict
1291
    @param node2ip: Node to ip mapping
1292
    @param port: The port to use for the TCP ping
1293
    @param feedback_fn: The function used for feedback
1294
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1295
    @param _sleep_fn: Function to sleep (for unittest use only)
1296

1297
    """
1298
    self.down = set(node_list)
1299
    self.up = set()
1300
    self.node2ip = node2ip
1301
    self.success = True
1302
    self.action_cb = action_cb
1303
    self.port = port
1304
    self.feedback_fn = feedback_fn
1305
    self._ping_fn = _ping_fn
1306
    self._sleep_fn = _sleep_fn
1307

    
1308
  def __call__(self):
1309
    """When called we run action_cb.
1310

1311
    @raises utils.RetryAgain: When there are still down nodes
1312

1313
    """
1314
    if not self.action_cb(self.up):
1315
      self.success = False
1316

    
1317
    if self.down:
1318
      raise utils.RetryAgain()
1319
    else:
1320
      return self.success
1321

    
1322
  def Wait(self, secs):
1323
    """Checks if a host is up or waits remaining seconds.
1324

1325
    @param secs: The secs remaining
1326

1327
    """
1328
    start = time.time()
1329
    for node in self.down:
1330
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1331
                       live_port_needed=True):
1332
        self.feedback_fn("Node %s became available" % node)
1333
        self.up.add(node)
1334
        self.down -= self.up
1335
        # If we have a node available there is the possibility to run the
1336
        # action callback successfully, therefore we don't wait and return
1337
        return
1338

    
1339
    self._sleep_fn(max(0.0, start + secs - time.time()))
1340

    
1341

    
1342
def _RunWhenNodesReachable(node_list, action_cb, interval):
1343
  """Run action_cb when nodes become reachable.
1344

1345
  @param node_list: The list of nodes to be reachable
1346
  @param action_cb: Callback called when a new host is reachable
1347
  @param interval: The earliest time to retry
1348

1349
  """
1350
  client = GetClient()
1351
  cluster_info = client.QueryClusterInfo()
1352
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1353
    family = netutils.IPAddress.family
1354
  else:
1355
    family = netutils.IP6Address.family
1356

    
1357
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1358
                 for node in node_list)
1359

    
1360
  port = netutils.GetDaemonPort(constants.NODED)
1361
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1362
                                        ToStdout)
1363

    
1364
  try:
1365
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1366
                       wait_fn=helper.Wait)
1367
  except utils.RetryTimeout:
1368
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1369
             " again:\n  - %s", "  - ".join(helper.down))
1370
    return False
1371

    
1372

    
1373
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1374
                          _instance_start_fn=_InstanceStart):
1375
  """Start the instances conditional based on node_states.
1376

1377
  @param opts: The command line options selected by the user
1378
  @param inst_map: A dict of inst -> nodes mapping
1379
  @param nodes_online: A list of nodes online
1380
  @param _instance_start_fn: Callback to start instances (unittest use only)
1381
  @return: Success of the operation on all instances
1382

1383
  """
1384
  start_inst_list = []
1385
  for (inst, nodes) in inst_map.items():
1386
    if not (nodes - nodes_online):
1387
      # All nodes the instance lives on are back online
1388
      start_inst_list.append(inst)
1389

    
1390
  for inst in start_inst_list:
1391
    del inst_map[inst]
1392

    
1393
  if start_inst_list:
1394
    return _instance_start_fn(opts, start_inst_list, True)
1395

    
1396
  return True
1397

    
1398

    
1399
def _EpoOn(opts, full_node_list, node_list, inst_map):
1400
  """Does the actual power on.
1401

1402
  @param opts: The command line options selected by the user
1403
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1404
                         OOB)
1405
  @param node_list: The list of nodes to operate on (all need to support OOB)
1406
  @param inst_map: A dict of inst -> nodes mapping
1407
  @return: The desired exit status
1408

1409
  """
1410
  if node_list and not _OobPower(opts, node_list, False):
1411
    ToStderr("Not all nodes seem to get back up, investigate and start"
1412
             " manually if needed")
1413

    
1414
  # Wait for the nodes to be back up
1415
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1416

    
1417
  ToStdout("Waiting until all nodes are available again")
1418
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1419
    ToStderr("Please investigate and start stopped instances manually")
1420
    return constants.EXIT_FAILURE
1421

    
1422
  return constants.EXIT_SUCCESS
1423

    
1424

    
1425
def _EpoOff(opts, node_list, inst_map):
1426
  """Does the actual power off.
1427

1428
  @param opts: The command line options selected by the user
1429
  @param node_list: The list of nodes to operate on (all need to support OOB)
1430
  @param inst_map: A dict of inst -> nodes mapping
1431
  @return: The desired exit status
1432

1433
  """
1434
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1435
    ToStderr("Please investigate and stop instances manually before continuing")
1436
    return constants.EXIT_FAILURE
1437

    
1438
  if not node_list:
1439
    return constants.EXIT_SUCCESS
1440

    
1441
  if _OobPower(opts, node_list, False):
1442
    return constants.EXIT_SUCCESS
1443
  else:
1444
    return constants.EXIT_FAILURE
1445

    
1446

    
1447
def Epo(opts, args, cl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1448
        _confirm_fn=ConfirmOperation,
1449
        _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1450
  """EPO operations.
1451

1452
  @param opts: the command line options selected by the user
1453
  @type args: list
1454
  @param args: should contain only one element, the subcommand
1455
  @rtype: int
1456
  @return: the desired exit code
1457

1458
  """
1459
  if opts.groups and opts.show_all:
1460
    _stderr_fn("Only one of --groups or --all are allowed")
1461
    return constants.EXIT_FAILURE
1462
  elif args and opts.show_all:
1463
    _stderr_fn("Arguments in combination with --all are not allowed")
1464
    return constants.EXIT_FAILURE
1465

    
1466
  if cl is None:
1467
    cl = GetClient()
1468

    
1469
  if opts.groups:
1470
    node_query_list = \
1471
      itertools.chain(*cl.QueryGroups(args, ["node_list"], False))
1472
  else:
1473
    node_query_list = args
1474

    
1475
  result = cl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1476
                                           "sinst_list", "powered", "offline"],
1477
                         False)
1478

    
1479
  all_nodes = map(compat.fst, result)
1480
  node_list = []
1481
  inst_map = {}
1482
  for (node, master, pinsts, sinsts, powered, offline) in result:
1483
    if not offline:
1484
      for inst in (pinsts + sinsts):
1485
        if inst in inst_map:
1486
          if not master:
1487
            inst_map[inst].add(node)
1488
        elif master:
1489
          inst_map[inst] = set()
1490
        else:
1491
          inst_map[inst] = set([node])
1492

    
1493
    if master and opts.on:
1494
      # We ignore the master for turning on the machines, in fact we are
1495
      # already operating on the master at this point :)
1496
      continue
1497
    elif master and not opts.show_all:
1498
      _stderr_fn("%s is the master node, please do a master-failover to another"
1499
                 " node not affected by the EPO or use --all if you intend to"
1500
                 " shutdown the whole cluster", node)
1501
      return constants.EXIT_FAILURE
1502
    elif powered is None:
1503
      _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1504
                 " handled in a fully automated manner", node)
1505
    elif powered == opts.on:
1506
      _stdout_fn("Node %s is already in desired power state, skipping", node)
1507
    elif not offline or (offline and powered):
1508
      node_list.append(node)
1509

    
1510
  if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1511
    return constants.EXIT_FAILURE
1512

    
1513
  if opts.on:
1514
    return _on_fn(opts, all_nodes, node_list, inst_map)
1515
  else:
1516
    return _off_fn(opts, node_list, inst_map)
1517

    
1518

    
1519
def _GetCreateCommand(info):
1520
  buf = StringIO()
1521
  buf.write("gnt-cluster init")
1522
  PrintIPolicyCommand(buf, info["ipolicy"], False)
1523
  buf.write(" ")
1524
  buf.write(info["name"])
1525
  return buf.getvalue()
1526

    
1527

    
1528
def ShowCreateCommand(opts, args):
1529
  """Shows the command that can be used to re-create the cluster.
1530

1531
  Currently it works only for ipolicy specs.
1532

1533
  """
1534
  cl = GetClient(query=True)
1535
  result = cl.QueryClusterInfo()
1536
  ToStdout(_GetCreateCommand(result))
1537

    
1538

    
1539
commands = {
1540
  "init": (
1541
    InitCluster, [ArgHost(min=1, max=1)],
1542
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
1543
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
1544
     NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, NOMODIFY_ETCHOSTS_OPT,
1545
     NOMODIFY_SSH_SETUP_OPT, SECONDARY_IP_OPT, VG_NAME_OPT,
1546
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT, NODRBD_STORAGE_OPT,
1547
     DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT,
1548
     NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT,
1549
     DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT, ENABLED_DISK_TEMPLATES_OPT,
1550
     IPOLICY_STD_SPECS_OPT] + INSTANCE_POLICY_OPTS + SPLIT_ISPECS_OPTS,
1551
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
1552
  "destroy": (
1553
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
1554
    "", "Destroy cluster"),
1555
  "rename": (
1556
    RenameCluster, [ArgHost(min=1, max=1)],
1557
    [FORCE_OPT, DRY_RUN_OPT],
1558
    "<new_name>",
1559
    "Renames the cluster"),
1560
  "redist-conf": (
1561
    RedistributeConfig, ARGS_NONE, SUBMIT_OPTS + [DRY_RUN_OPT, PRIORITY_OPT],
1562
    "", "Forces a push of the configuration file and ssconf files"
1563
    " to the nodes in the cluster"),
1564
  "verify": (
1565
    VerifyCluster, ARGS_NONE,
1566
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
1567
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
1568
    "", "Does a check on the cluster configuration"),
1569
  "verify-disks": (
1570
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
1571
    "", "Does a check on the cluster disk status"),
1572
  "repair-disk-sizes": (
1573
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
1574
    "[instance...]", "Updates mismatches in recorded disk sizes"),
1575
  "master-failover": (
1576
    MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
1577
    "", "Makes the current node the master"),
1578
  "master-ping": (
1579
    MasterPing, ARGS_NONE, [],
1580
    "", "Checks if the master is alive"),
1581
  "version": (
1582
    ShowClusterVersion, ARGS_NONE, [],
1583
    "", "Shows the cluster version"),
1584
  "getmaster": (
1585
    ShowClusterMaster, ARGS_NONE, [],
1586
    "", "Shows the cluster master"),
1587
  "copyfile": (
1588
    ClusterCopyFile, [ArgFile(min=1, max=1)],
1589
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
1590
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
1591
  "command": (
1592
    RunClusterCommand, [ArgCommand(min=1)],
1593
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
1594
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
1595
  "info": (
1596
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
1597
    "[--roman]", "Show cluster configuration"),
1598
  "list-tags": (
1599
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
1600
  "add-tags": (
1601
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
1602
    "tag...", "Add tags to the cluster"),
1603
  "remove-tags": (
1604
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
1605
    "tag...", "Remove tags from the cluster"),
1606
  "search-tags": (
1607
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
1608
    "Searches the tags on all objects on"
1609
    " the cluster for a given pattern (regex)"),
1610
  "queue": (
1611
    QueueOps,
1612
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
1613
    [], "drain|undrain|info", "Change queue properties"),
1614
  "watcher": (
1615
    WatcherOps,
1616
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
1617
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
1618
    [],
1619
    "{pause <timespec>|continue|info}", "Change watcher properties"),
1620
  "modify": (
1621
    SetClusterParams, ARGS_NONE,
1622
    [FORCE_OPT,
1623
     BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
1624
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT,
1625
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT,
1626
     DRBD_HELPER_OPT, NODRBD_STORAGE_OPT, DEFAULT_IALLOCATOR_OPT,
1627
     RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT,
1628
     NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT,
1629
     DISK_STATE_OPT] + SUBMIT_OPTS +
1630
     [ENABLED_DISK_TEMPLATES_OPT, IPOLICY_STD_SPECS_OPT, MODIFY_ETCHOSTS_OPT] +
1631
     INSTANCE_POLICY_OPTS + [GLOBAL_FILEDIR_OPT],
1632
    "[opts...]",
1633
    "Alters the parameters of the cluster"),
1634
  "renew-crypto": (
1635
    RenewCrypto, ARGS_NONE,
1636
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
1637
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
1638
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
1639
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
1640
    "[opts...]",
1641
    "Renews cluster certificates, keys and secrets"),
1642
  "epo": (
1643
    Epo, [ArgUnknown()],
1644
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
1645
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
1646
    "[opts...] [args]",
1647
    "Performs an emergency power-off on given args"),
1648
  "activate-master-ip": (
1649
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
1650
  "deactivate-master-ip": (
1651
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
1652
    "Deactivates the master IP"),
1653
  "show-ispecs-cmd": (
1654
    ShowCreateCommand, ARGS_NONE, [], "",
1655
    "Show the command line to re-create the cluster"),
1656
  }
1657

    
1658

    
1659
#: dictionary with aliases for commands
1660
aliases = {
1661
  "masterfailover": "master-failover",
1662
  "show": "info",
1663
}
1664

    
1665

    
1666
def Main():
1667
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
1668
                     aliases=aliases)