Statistics
| Branch: | Tag: | Revision:

root / lib / config.py @ 82c54b5b

History | View | Annotate | Download (72.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Configuration management for Ganeti
23

24
This module provides the interface to the Ganeti cluster configuration.
25

26
The configuration data is stored on every node but is updated on the master
27
only. After each update, the master distributes the data to the other nodes.
28

29
Currently, the data storage format is JSON. YAML was slow and consuming too
30
much memory.
31

32
"""
33

    
34
# pylint: disable=R0904
35
# R0904: Too many public methods
36

    
37
import os
38
import random
39
import logging
40
import time
41
import itertools
42

    
43
from ganeti import errors
44
from ganeti import locking
45
from ganeti import utils
46
from ganeti import constants
47
from ganeti import rpc
48
from ganeti import objects
49
from ganeti import serializer
50
from ganeti import uidpool
51
from ganeti import netutils
52
from ganeti import runtime
53

    
54

    
55
_config_lock = locking.SharedLock("ConfigWriter")
56

    
57
# job id used for resource management at config upgrade time
58
_UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
59

    
60

    
61
def _ValidateConfig(data):
62
  """Verifies that a configuration objects looks valid.
63

64
  This only verifies the version of the configuration.
65

66
  @raise errors.ConfigurationError: if the version differs from what
67
      we expect
68

69
  """
70
  if data.version != constants.CONFIG_VERSION:
71
    raise errors.ConfigVersionMismatch(constants.CONFIG_VERSION, data.version)
72

    
73

    
74
class TemporaryReservationManager:
75
  """A temporary resource reservation manager.
76

77
  This is used to reserve resources in a job, before using them, making sure
78
  other jobs cannot get them in the meantime.
79

80
  """
81
  def __init__(self):
82
    self._ec_reserved = {}
83

    
84
  def Reserved(self, resource):
85
    for holder_reserved in self._ec_reserved.values():
86
      if resource in holder_reserved:
87
        return True
88
    return False
89

    
90
  def Reserve(self, ec_id, resource):
91
    if self.Reserved(resource):
92
      raise errors.ReservationError("Duplicate reservation for resource '%s'"
93
                                    % str(resource))
94
    if ec_id not in self._ec_reserved:
95
      self._ec_reserved[ec_id] = set([resource])
96
    else:
97
      self._ec_reserved[ec_id].add(resource)
98

    
99
  def DropECReservations(self, ec_id):
100
    if ec_id in self._ec_reserved:
101
      del self._ec_reserved[ec_id]
102

    
103
  def GetReserved(self):
104
    all_reserved = set()
105
    for holder_reserved in self._ec_reserved.values():
106
      all_reserved.update(holder_reserved)
107
    return all_reserved
108

    
109
  def Generate(self, existing, generate_one_fn, ec_id):
110
    """Generate a new resource of this type
111

112
    """
113
    assert callable(generate_one_fn)
114

    
115
    all_elems = self.GetReserved()
116
    all_elems.update(existing)
117
    retries = 64
118
    while retries > 0:
119
      new_resource = generate_one_fn()
120
      if new_resource is not None and new_resource not in all_elems:
121
        break
122
    else:
123
      raise errors.ConfigurationError("Not able generate new resource"
124
                                      " (last tried: %s)" % new_resource)
125
    self.Reserve(ec_id, new_resource)
126
    return new_resource
127

    
128

    
129
def _MatchNameComponentIgnoreCase(short_name, names):
130
  """Wrapper around L{utils.text.MatchNameComponent}.
131

132
  """
133
  return utils.MatchNameComponent(short_name, names, case_sensitive=False)
134

    
135

    
136
def _CheckInstanceDiskIvNames(disks):
137
  """Checks if instance's disks' C{iv_name} attributes are in order.
138

139
  @type disks: list of L{objects.Disk}
140
  @param disks: List of disks
141
  @rtype: list of tuples; (int, string, string)
142
  @return: List of wrongly named disks, each tuple contains disk index,
143
    expected and actual name
144

145
  """
146
  result = []
147

    
148
  for (idx, disk) in enumerate(disks):
149
    exp_iv_name = "disk/%s" % idx
150
    if disk.iv_name != exp_iv_name:
151
      result.append((idx, exp_iv_name, disk.iv_name))
152

    
153
  return result
154

    
155

    
156
class ConfigWriter:
157
  """The interface to the cluster configuration.
158

159
  @ivar _temporary_lvs: reservation manager for temporary LVs
160
  @ivar _all_rms: a list of all temporary reservation managers
161

162
  """
163
  def __init__(self, cfg_file=None, offline=False, _getents=runtime.GetEnts,
164
               accept_foreign=False):
165
    self.write_count = 0
166
    self._lock = _config_lock
167
    self._config_data = None
168
    self._offline = offline
169
    if cfg_file is None:
170
      self._cfg_file = constants.CLUSTER_CONF_FILE
171
    else:
172
      self._cfg_file = cfg_file
173
    self._getents = _getents
174
    self._temporary_ids = TemporaryReservationManager()
175
    self._temporary_drbds = {}
176
    self._temporary_macs = TemporaryReservationManager()
177
    self._temporary_secrets = TemporaryReservationManager()
178
    self._temporary_lvs = TemporaryReservationManager()
179
    self._all_rms = [self._temporary_ids, self._temporary_macs,
180
                     self._temporary_secrets, self._temporary_lvs]
181
    # Note: in order to prevent errors when resolving our name in
182
    # _DistributeConfig, we compute it here once and reuse it; it's
183
    # better to raise an error before starting to modify the config
184
    # file than after it was modified
185
    self._my_hostname = netutils.Hostname.GetSysName()
186
    self._last_cluster_serial = -1
187
    self._cfg_id = None
188
    self._context = None
189
    self._OpenConfig(accept_foreign)
190

    
191
  def _GetRpc(self, address_list):
192
    """Returns RPC runner for configuration.
193

194
    """
195
    return rpc.ConfigRunner(self._context, address_list)
196

    
197
  def SetContext(self, context):
198
    """Sets Ganeti context.
199

200
    """
201
    self._context = context
202

    
203
  # this method needs to be static, so that we can call it on the class
204
  @staticmethod
205
  def IsCluster():
206
    """Check if the cluster is configured.
207

208
    """
209
    return os.path.exists(constants.CLUSTER_CONF_FILE)
210

    
211
  def _GenerateOneMAC(self):
212
    """Generate one mac address
213

214
    """
215
    prefix = self._config_data.cluster.mac_prefix
216
    byte1 = random.randrange(0, 256)
217
    byte2 = random.randrange(0, 256)
218
    byte3 = random.randrange(0, 256)
219
    mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
220
    return mac
221

    
222
  @locking.ssynchronized(_config_lock, shared=1)
223
  def GetNdParams(self, node):
224
    """Get the node params populated with cluster defaults.
225

226
    @type node: L{objects.Node}
227
    @param node: The node we want to know the params for
228
    @return: A dict with the filled in node params
229

230
    """
231
    nodegroup = self._UnlockedGetNodeGroup(node.group)
232
    return self._config_data.cluster.FillND(node, nodegroup)
233

    
234
  @locking.ssynchronized(_config_lock, shared=1)
235
  def GenerateMAC(self, ec_id):
236
    """Generate a MAC for an instance.
237

238
    This should check the current instances for duplicates.
239

240
    """
241
    existing = self._AllMACs()
242
    return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
243

    
244
  @locking.ssynchronized(_config_lock, shared=1)
245
  def ReserveMAC(self, mac, ec_id):
246
    """Reserve a MAC for an instance.
247

248
    This only checks instances managed by this cluster, it does not
249
    check for potential collisions elsewhere.
250

251
    """
252
    all_macs = self._AllMACs()
253
    if mac in all_macs:
254
      raise errors.ReservationError("mac already in use")
255
    else:
256
      self._temporary_macs.Reserve(ec_id, mac)
257

    
258
  @locking.ssynchronized(_config_lock, shared=1)
259
  def ReserveLV(self, lv_name, ec_id):
260
    """Reserve an VG/LV pair for an instance.
261

262
    @type lv_name: string
263
    @param lv_name: the logical volume name to reserve
264

265
    """
266
    all_lvs = self._AllLVs()
267
    if lv_name in all_lvs:
268
      raise errors.ReservationError("LV already in use")
269
    else:
270
      self._temporary_lvs.Reserve(ec_id, lv_name)
271

    
272
  @locking.ssynchronized(_config_lock, shared=1)
273
  def GenerateDRBDSecret(self, ec_id):
274
    """Generate a DRBD secret.
275

276
    This checks the current disks for duplicates.
277

278
    """
279
    return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
280
                                            utils.GenerateSecret,
281
                                            ec_id)
282

    
283
  def _AllLVs(self):
284
    """Compute the list of all LVs.
285

286
    """
287
    lvnames = set()
288
    for instance in self._config_data.instances.values():
289
      node_data = instance.MapLVsByNode()
290
      for lv_list in node_data.values():
291
        lvnames.update(lv_list)
292
    return lvnames
293

    
294
  def _AllIDs(self, include_temporary):
295
    """Compute the list of all UUIDs and names we have.
296

297
    @type include_temporary: boolean
298
    @param include_temporary: whether to include the _temporary_ids set
299
    @rtype: set
300
    @return: a set of IDs
301

302
    """
303
    existing = set()
304
    if include_temporary:
305
      existing.update(self._temporary_ids.GetReserved())
306
    existing.update(self._AllLVs())
307
    existing.update(self._config_data.instances.keys())
308
    existing.update(self._config_data.nodes.keys())
309
    existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
310
    return existing
311

    
312
  def _GenerateUniqueID(self, ec_id):
313
    """Generate an unique UUID.
314

315
    This checks the current node, instances and disk names for
316
    duplicates.
317

318
    @rtype: string
319
    @return: the unique id
320

321
    """
322
    existing = self._AllIDs(include_temporary=False)
323
    return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
324

    
325
  @locking.ssynchronized(_config_lock, shared=1)
326
  def GenerateUniqueID(self, ec_id):
327
    """Generate an unique ID.
328

329
    This is just a wrapper over the unlocked version.
330

331
    @type ec_id: string
332
    @param ec_id: unique id for the job to reserve the id to
333

334
    """
335
    return self._GenerateUniqueID(ec_id)
336

    
337
  def _AllMACs(self):
338
    """Return all MACs present in the config.
339

340
    @rtype: list
341
    @return: the list of all MACs
342

343
    """
344
    result = []
345
    for instance in self._config_data.instances.values():
346
      for nic in instance.nics:
347
        result.append(nic.mac)
348

    
349
    return result
350

    
351
  def _AllDRBDSecrets(self):
352
    """Return all DRBD secrets present in the config.
353

354
    @rtype: list
355
    @return: the list of all DRBD secrets
356

357
    """
358
    def helper(disk, result):
359
      """Recursively gather secrets from this disk."""
360
      if disk.dev_type == constants.DT_DRBD8:
361
        result.append(disk.logical_id[5])
362
      if disk.children:
363
        for child in disk.children:
364
          helper(child, result)
365

    
366
    result = []
367
    for instance in self._config_data.instances.values():
368
      for disk in instance.disks:
369
        helper(disk, result)
370

    
371
    return result
372

    
373
  def _CheckDiskIDs(self, disk, l_ids, p_ids):
374
    """Compute duplicate disk IDs
375

376
    @type disk: L{objects.Disk}
377
    @param disk: the disk at which to start searching
378
    @type l_ids: list
379
    @param l_ids: list of current logical ids
380
    @type p_ids: list
381
    @param p_ids: list of current physical ids
382
    @rtype: list
383
    @return: a list of error messages
384

385
    """
386
    result = []
387
    if disk.logical_id is not None:
388
      if disk.logical_id in l_ids:
389
        result.append("duplicate logical id %s" % str(disk.logical_id))
390
      else:
391
        l_ids.append(disk.logical_id)
392
    if disk.physical_id is not None:
393
      if disk.physical_id in p_ids:
394
        result.append("duplicate physical id %s" % str(disk.physical_id))
395
      else:
396
        p_ids.append(disk.physical_id)
397

    
398
    if disk.children:
399
      for child in disk.children:
400
        result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
401
    return result
402

    
403
  def _UnlockedVerifyConfig(self):
404
    """Verify function.
405

406
    @rtype: list
407
    @return: a list of error messages; a non-empty list signifies
408
        configuration errors
409

410
    """
411
    # pylint: disable=R0914
412
    result = []
413
    seen_macs = []
414
    ports = {}
415
    data = self._config_data
416
    cluster = data.cluster
417
    seen_lids = []
418
    seen_pids = []
419

    
420
    # global cluster checks
421
    if not cluster.enabled_hypervisors:
422
      result.append("enabled hypervisors list doesn't have any entries")
423
    invalid_hvs = set(cluster.enabled_hypervisors) - constants.HYPER_TYPES
424
    if invalid_hvs:
425
      result.append("enabled hypervisors contains invalid entries: %s" %
426
                    invalid_hvs)
427
    missing_hvp = (set(cluster.enabled_hypervisors) -
428
                   set(cluster.hvparams.keys()))
429
    if missing_hvp:
430
      result.append("hypervisor parameters missing for the enabled"
431
                    " hypervisor(s) %s" % utils.CommaJoin(missing_hvp))
432

    
433
    if cluster.master_node not in data.nodes:
434
      result.append("cluster has invalid primary node '%s'" %
435
                    cluster.master_node)
436

    
437
    def _helper(owner, attr, value, template):
438
      try:
439
        utils.ForceDictType(value, template)
440
      except errors.GenericError, err:
441
        result.append("%s has invalid %s: %s" % (owner, attr, err))
442

    
443
    def _helper_nic(owner, params):
444
      try:
445
        objects.NIC.CheckParameterSyntax(params)
446
      except errors.ConfigurationError, err:
447
        result.append("%s has invalid nicparams: %s" % (owner, err))
448

    
449
    def _helper_ipolicy(owner, params):
450
      try:
451
        objects.InstancePolicy.CheckParameterSyntax(params)
452
      except errors.ConfigurationError, err:
453
        result.append("%s has invalid instance policy: %s" % (owner, err))
454

    
455
    def _helper_ispecs(owner, params):
456
      for key, value in params.items():
457
        if key in constants.IPOLICY_ISPECS:
458
          fullkey = "ipolicy/" + key
459
          _helper(owner, fullkey, value, constants.ISPECS_PARAMETER_TYPES)
460
        else:
461
          # FIXME: assuming list type
462
          if key in constants.IPOLICY_PARAMETERS:
463
            exp_type = float
464
          else:
465
            exp_type = list
466
          if not isinstance(value, exp_type):
467
            result.append("%s has invalid instance policy: for %s,"
468
                          " expecting %s, got %s" %
469
                          (owner, key, exp_type.__name__, type(value)))
470

    
471
    # check cluster parameters
472
    _helper("cluster", "beparams", cluster.SimpleFillBE({}),
473
            constants.BES_PARAMETER_TYPES)
474
    _helper("cluster", "nicparams", cluster.SimpleFillNIC({}),
475
            constants.NICS_PARAMETER_TYPES)
476
    _helper_nic("cluster", cluster.SimpleFillNIC({}))
477
    _helper("cluster", "ndparams", cluster.SimpleFillND({}),
478
            constants.NDS_PARAMETER_TYPES)
479
    _helper_ipolicy("cluster", cluster.SimpleFillIPolicy({}))
480
    _helper_ispecs("cluster", cluster.SimpleFillIPolicy({}))
481

    
482
    # per-instance checks
483
    for instance_name in data.instances:
484
      instance = data.instances[instance_name]
485
      if instance.name != instance_name:
486
        result.append("instance '%s' is indexed by wrong name '%s'" %
487
                      (instance.name, instance_name))
488
      if instance.primary_node not in data.nodes:
489
        result.append("instance '%s' has invalid primary node '%s'" %
490
                      (instance_name, instance.primary_node))
491
      for snode in instance.secondary_nodes:
492
        if snode not in data.nodes:
493
          result.append("instance '%s' has invalid secondary node '%s'" %
494
                        (instance_name, snode))
495
      for idx, nic in enumerate(instance.nics):
496
        if nic.mac in seen_macs:
497
          result.append("instance '%s' has NIC %d mac %s duplicate" %
498
                        (instance_name, idx, nic.mac))
499
        else:
500
          seen_macs.append(nic.mac)
501
        if nic.nicparams:
502
          filled = cluster.SimpleFillNIC(nic.nicparams)
503
          owner = "instance %s nic %d" % (instance.name, idx)
504
          _helper(owner, "nicparams",
505
                  filled, constants.NICS_PARAMETER_TYPES)
506
          _helper_nic(owner, filled)
507

    
508
      # parameter checks
509
      if instance.beparams:
510
        _helper("instance %s" % instance.name, "beparams",
511
                cluster.FillBE(instance), constants.BES_PARAMETER_TYPES)
512

    
513
      # gather the drbd ports for duplicate checks
514
      for (idx, dsk) in enumerate(instance.disks):
515
        if dsk.dev_type in constants.LDS_DRBD:
516
          tcp_port = dsk.logical_id[2]
517
          if tcp_port not in ports:
518
            ports[tcp_port] = []
519
          ports[tcp_port].append((instance.name, "drbd disk %s" % idx))
520
      # gather network port reservation
521
      net_port = getattr(instance, "network_port", None)
522
      if net_port is not None:
523
        if net_port not in ports:
524
          ports[net_port] = []
525
        ports[net_port].append((instance.name, "network port"))
526

    
527
      # instance disk verify
528
      for idx, disk in enumerate(instance.disks):
529
        result.extend(["instance '%s' disk %d error: %s" %
530
                       (instance.name, idx, msg) for msg in disk.Verify()])
531
        result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
532

    
533
      wrong_names = _CheckInstanceDiskIvNames(instance.disks)
534
      if wrong_names:
535
        tmp = "; ".join(("name of disk %s should be '%s', but is '%s'" %
536
                         (idx, exp_name, actual_name))
537
                        for (idx, exp_name, actual_name) in wrong_names)
538

    
539
        result.append("Instance '%s' has wrongly named disks: %s" %
540
                      (instance.name, tmp))
541

    
542
    # cluster-wide pool of free ports
543
    for free_port in cluster.tcpudp_port_pool:
544
      if free_port not in ports:
545
        ports[free_port] = []
546
      ports[free_port].append(("cluster", "port marked as free"))
547

    
548
    # compute tcp/udp duplicate ports
549
    keys = ports.keys()
550
    keys.sort()
551
    for pnum in keys:
552
      pdata = ports[pnum]
553
      if len(pdata) > 1:
554
        txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
555
        result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
556

    
557
    # highest used tcp port check
558
    if keys:
559
      if keys[-1] > cluster.highest_used_port:
560
        result.append("Highest used port mismatch, saved %s, computed %s" %
561
                      (cluster.highest_used_port, keys[-1]))
562

    
563
    if not data.nodes[cluster.master_node].master_candidate:
564
      result.append("Master node is not a master candidate")
565

    
566
    # master candidate checks
567
    mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
568
    if mc_now < mc_max:
569
      result.append("Not enough master candidates: actual %d, target %d" %
570
                    (mc_now, mc_max))
571

    
572
    # node checks
573
    for node_name, node in data.nodes.items():
574
      if node.name != node_name:
575
        result.append("Node '%s' is indexed by wrong name '%s'" %
576
                      (node.name, node_name))
577
      if [node.master_candidate, node.drained, node.offline].count(True) > 1:
578
        result.append("Node %s state is invalid: master_candidate=%s,"
579
                      " drain=%s, offline=%s" %
580
                      (node.name, node.master_candidate, node.drained,
581
                       node.offline))
582
      if node.group not in data.nodegroups:
583
        result.append("Node '%s' has invalid group '%s'" %
584
                      (node.name, node.group))
585
      else:
586
        _helper("node %s" % node.name, "ndparams",
587
                cluster.FillND(node, data.nodegroups[node.group]),
588
                constants.NDS_PARAMETER_TYPES)
589

    
590
    # nodegroups checks
591
    nodegroups_names = set()
592
    for nodegroup_uuid in data.nodegroups:
593
      nodegroup = data.nodegroups[nodegroup_uuid]
594
      if nodegroup.uuid != nodegroup_uuid:
595
        result.append("node group '%s' (uuid: '%s') indexed by wrong uuid '%s'"
596
                      % (nodegroup.name, nodegroup.uuid, nodegroup_uuid))
597
      if utils.UUID_RE.match(nodegroup.name.lower()):
598
        result.append("node group '%s' (uuid: '%s') has uuid-like name" %
599
                      (nodegroup.name, nodegroup.uuid))
600
      if nodegroup.name in nodegroups_names:
601
        result.append("duplicate node group name '%s'" % nodegroup.name)
602
      else:
603
        nodegroups_names.add(nodegroup.name)
604
      group_name = "group %s" % nodegroup.name
605
      _helper_ipolicy(group_name, cluster.SimpleFillIPolicy(nodegroup.ipolicy))
606
      _helper_ispecs(group_name, cluster.SimpleFillIPolicy(nodegroup.ipolicy))
607
      if nodegroup.ndparams:
608
        _helper(group_name, "ndparams",
609
                cluster.SimpleFillND(nodegroup.ndparams),
610
                constants.NDS_PARAMETER_TYPES)
611

    
612
    # drbd minors check
613
    _, duplicates = self._UnlockedComputeDRBDMap()
614
    for node, minor, instance_a, instance_b in duplicates:
615
      result.append("DRBD minor %d on node %s is assigned twice to instances"
616
                    " %s and %s" % (minor, node, instance_a, instance_b))
617

    
618
    # IP checks
619
    default_nicparams = cluster.nicparams[constants.PP_DEFAULT]
620
    ips = {}
621

    
622
    def _AddIpAddress(ip, name):
623
      ips.setdefault(ip, []).append(name)
624

    
625
    _AddIpAddress(cluster.master_ip, "cluster_ip")
626

    
627
    for node in data.nodes.values():
628
      _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
629
      if node.secondary_ip != node.primary_ip:
630
        _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
631

    
632
    for instance in data.instances.values():
633
      for idx, nic in enumerate(instance.nics):
634
        if nic.ip is None:
635
          continue
636

    
637
        nicparams = objects.FillDict(default_nicparams, nic.nicparams)
638
        nic_mode = nicparams[constants.NIC_MODE]
639
        nic_link = nicparams[constants.NIC_LINK]
640

    
641
        if nic_mode == constants.NIC_MODE_BRIDGED:
642
          link = "bridge:%s" % nic_link
643
        elif nic_mode == constants.NIC_MODE_ROUTED:
644
          link = "route:%s" % nic_link
645
        else:
646
          raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
647

    
648
        _AddIpAddress("%s/%s" % (link, nic.ip),
649
                      "instance:%s/nic:%d" % (instance.name, idx))
650

    
651
    for ip, owners in ips.items():
652
      if len(owners) > 1:
653
        result.append("IP address %s is used by multiple owners: %s" %
654
                      (ip, utils.CommaJoin(owners)))
655

    
656
    return result
657

    
658
  @locking.ssynchronized(_config_lock, shared=1)
659
  def VerifyConfig(self):
660
    """Verify function.
661

662
    This is just a wrapper over L{_UnlockedVerifyConfig}.
663

664
    @rtype: list
665
    @return: a list of error messages; a non-empty list signifies
666
        configuration errors
667

668
    """
669
    return self._UnlockedVerifyConfig()
670

    
671
  def _UnlockedSetDiskID(self, disk, node_name):
672
    """Convert the unique ID to the ID needed on the target nodes.
673

674
    This is used only for drbd, which needs ip/port configuration.
675

676
    The routine descends down and updates its children also, because
677
    this helps when the only the top device is passed to the remote
678
    node.
679

680
    This function is for internal use, when the config lock is already held.
681

682
    """
683
    if disk.children:
684
      for child in disk.children:
685
        self._UnlockedSetDiskID(child, node_name)
686

    
687
    if disk.logical_id is None and disk.physical_id is not None:
688
      return
689
    if disk.dev_type == constants.LD_DRBD8:
690
      pnode, snode, port, pminor, sminor, secret = disk.logical_id
691
      if node_name not in (pnode, snode):
692
        raise errors.ConfigurationError("DRBD device not knowing node %s" %
693
                                        node_name)
694
      pnode_info = self._UnlockedGetNodeInfo(pnode)
695
      snode_info = self._UnlockedGetNodeInfo(snode)
696
      if pnode_info is None or snode_info is None:
697
        raise errors.ConfigurationError("Can't find primary or secondary node"
698
                                        " for %s" % str(disk))
699
      p_data = (pnode_info.secondary_ip, port)
700
      s_data = (snode_info.secondary_ip, port)
701
      if pnode == node_name:
702
        disk.physical_id = p_data + s_data + (pminor, secret)
703
      else: # it must be secondary, we tested above
704
        disk.physical_id = s_data + p_data + (sminor, secret)
705
    else:
706
      disk.physical_id = disk.logical_id
707
    return
708

    
709
  @locking.ssynchronized(_config_lock)
710
  def SetDiskID(self, disk, node_name):
711
    """Convert the unique ID to the ID needed on the target nodes.
712

713
    This is used only for drbd, which needs ip/port configuration.
714

715
    The routine descends down and updates its children also, because
716
    this helps when the only the top device is passed to the remote
717
    node.
718

719
    """
720
    return self._UnlockedSetDiskID(disk, node_name)
721

    
722
  @locking.ssynchronized(_config_lock)
723
  def AddTcpUdpPort(self, port):
724
    """Adds a new port to the available port pool.
725

726
    """
727
    if not isinstance(port, int):
728
      raise errors.ProgrammerError("Invalid type passed for port")
729

    
730
    self._config_data.cluster.tcpudp_port_pool.add(port)
731
    self._WriteConfig()
732

    
733
  @locking.ssynchronized(_config_lock, shared=1)
734
  def GetPortList(self):
735
    """Returns a copy of the current port list.
736

737
    """
738
    return self._config_data.cluster.tcpudp_port_pool.copy()
739

    
740
  @locking.ssynchronized(_config_lock)
741
  def AllocatePort(self):
742
    """Allocate a port.
743

744
    The port will be taken from the available port pool or from the
745
    default port range (and in this case we increase
746
    highest_used_port).
747

748
    """
749
    # If there are TCP/IP ports configured, we use them first.
750
    if self._config_data.cluster.tcpudp_port_pool:
751
      port = self._config_data.cluster.tcpudp_port_pool.pop()
752
    else:
753
      port = self._config_data.cluster.highest_used_port + 1
754
      if port >= constants.LAST_DRBD_PORT:
755
        raise errors.ConfigurationError("The highest used port is greater"
756
                                        " than %s. Aborting." %
757
                                        constants.LAST_DRBD_PORT)
758
      self._config_data.cluster.highest_used_port = port
759

    
760
    self._WriteConfig()
761
    return port
762

    
763
  def _UnlockedComputeDRBDMap(self):
764
    """Compute the used DRBD minor/nodes.
765

766
    @rtype: (dict, list)
767
    @return: dictionary of node_name: dict of minor: instance_name;
768
        the returned dict will have all the nodes in it (even if with
769
        an empty list), and a list of duplicates; if the duplicates
770
        list is not empty, the configuration is corrupted and its caller
771
        should raise an exception
772

773
    """
774
    def _AppendUsedPorts(instance_name, disk, used):
775
      duplicates = []
776
      if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
777
        node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
778
        for node, port in ((node_a, minor_a), (node_b, minor_b)):
779
          assert node in used, ("Node '%s' of instance '%s' not found"
780
                                " in node list" % (node, instance_name))
781
          if port in used[node]:
782
            duplicates.append((node, port, instance_name, used[node][port]))
783
          else:
784
            used[node][port] = instance_name
785
      if disk.children:
786
        for child in disk.children:
787
          duplicates.extend(_AppendUsedPorts(instance_name, child, used))
788
      return duplicates
789

    
790
    duplicates = []
791
    my_dict = dict((node, {}) for node in self._config_data.nodes)
792
    for instance in self._config_data.instances.itervalues():
793
      for disk in instance.disks:
794
        duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
795
    for (node, minor), instance in self._temporary_drbds.iteritems():
796
      if minor in my_dict[node] and my_dict[node][minor] != instance:
797
        duplicates.append((node, minor, instance, my_dict[node][minor]))
798
      else:
799
        my_dict[node][minor] = instance
800
    return my_dict, duplicates
801

    
802
  @locking.ssynchronized(_config_lock)
803
  def ComputeDRBDMap(self):
804
    """Compute the used DRBD minor/nodes.
805

806
    This is just a wrapper over L{_UnlockedComputeDRBDMap}.
807

808
    @return: dictionary of node_name: dict of minor: instance_name;
809
        the returned dict will have all the nodes in it (even if with
810
        an empty list).
811

812
    """
813
    d_map, duplicates = self._UnlockedComputeDRBDMap()
814
    if duplicates:
815
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
816
                                      str(duplicates))
817
    return d_map
818

    
819
  @locking.ssynchronized(_config_lock)
820
  def AllocateDRBDMinor(self, nodes, instance):
821
    """Allocate a drbd minor.
822

823
    The free minor will be automatically computed from the existing
824
    devices. A node can be given multiple times in order to allocate
825
    multiple minors. The result is the list of minors, in the same
826
    order as the passed nodes.
827

828
    @type instance: string
829
    @param instance: the instance for which we allocate minors
830

831
    """
832
    assert isinstance(instance, basestring), \
833
           "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
834

    
835
    d_map, duplicates = self._UnlockedComputeDRBDMap()
836
    if duplicates:
837
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
838
                                      str(duplicates))
839
    result = []
840
    for nname in nodes:
841
      ndata = d_map[nname]
842
      if not ndata:
843
        # no minors used, we can start at 0
844
        result.append(0)
845
        ndata[0] = instance
846
        self._temporary_drbds[(nname, 0)] = instance
847
        continue
848
      keys = ndata.keys()
849
      keys.sort()
850
      ffree = utils.FirstFree(keys)
851
      if ffree is None:
852
        # return the next minor
853
        # TODO: implement high-limit check
854
        minor = keys[-1] + 1
855
      else:
856
        minor = ffree
857
      # double-check minor against current instances
858
      assert minor not in d_map[nname], \
859
             ("Attempt to reuse allocated DRBD minor %d on node %s,"
860
              " already allocated to instance %s" %
861
              (minor, nname, d_map[nname][minor]))
862
      ndata[minor] = instance
863
      # double-check minor against reservation
864
      r_key = (nname, minor)
865
      assert r_key not in self._temporary_drbds, \
866
             ("Attempt to reuse reserved DRBD minor %d on node %s,"
867
              " reserved for instance %s" %
868
              (minor, nname, self._temporary_drbds[r_key]))
869
      self._temporary_drbds[r_key] = instance
870
      result.append(minor)
871
    logging.debug("Request to allocate drbd minors, input: %s, returning %s",
872
                  nodes, result)
873
    return result
874

    
875
  def _UnlockedReleaseDRBDMinors(self, instance):
876
    """Release temporary drbd minors allocated for a given instance.
877

878
    @type instance: string
879
    @param instance: the instance for which temporary minors should be
880
                     released
881

882
    """
883
    assert isinstance(instance, basestring), \
884
           "Invalid argument passed to ReleaseDRBDMinors"
885
    for key, name in self._temporary_drbds.items():
886
      if name == instance:
887
        del self._temporary_drbds[key]
888

    
889
  @locking.ssynchronized(_config_lock)
890
  def ReleaseDRBDMinors(self, instance):
891
    """Release temporary drbd minors allocated for a given instance.
892

893
    This should be called on the error paths, on the success paths
894
    it's automatically called by the ConfigWriter add and update
895
    functions.
896

897
    This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
898

899
    @type instance: string
900
    @param instance: the instance for which temporary minors should be
901
                     released
902

903
    """
904
    self._UnlockedReleaseDRBDMinors(instance)
905

    
906
  @locking.ssynchronized(_config_lock, shared=1)
907
  def GetConfigVersion(self):
908
    """Get the configuration version.
909

910
    @return: Config version
911

912
    """
913
    return self._config_data.version
914

    
915
  @locking.ssynchronized(_config_lock, shared=1)
916
  def GetClusterName(self):
917
    """Get cluster name.
918

919
    @return: Cluster name
920

921
    """
922
    return self._config_data.cluster.cluster_name
923

    
924
  @locking.ssynchronized(_config_lock, shared=1)
925
  def GetMasterNode(self):
926
    """Get the hostname of the master node for this cluster.
927

928
    @return: Master hostname
929

930
    """
931
    return self._config_data.cluster.master_node
932

    
933
  @locking.ssynchronized(_config_lock, shared=1)
934
  def GetMasterIP(self):
935
    """Get the IP of the master node for this cluster.
936

937
    @return: Master IP
938

939
    """
940
    return self._config_data.cluster.master_ip
941

    
942
  @locking.ssynchronized(_config_lock, shared=1)
943
  def GetMasterNetdev(self):
944
    """Get the master network device for this cluster.
945

946
    """
947
    return self._config_data.cluster.master_netdev
948

    
949
  @locking.ssynchronized(_config_lock, shared=1)
950
  def GetMasterNetmask(self):
951
    """Get the netmask of the master node for this cluster.
952

953
    """
954
    return self._config_data.cluster.master_netmask
955

    
956
  @locking.ssynchronized(_config_lock, shared=1)
957
  def GetUseExternalMipScript(self):
958
    """Get flag representing whether to use the external master IP setup script.
959

960
    """
961
    return self._config_data.cluster.use_external_mip_script
962

    
963
  @locking.ssynchronized(_config_lock, shared=1)
964
  def GetFileStorageDir(self):
965
    """Get the file storage dir for this cluster.
966

967
    """
968
    return self._config_data.cluster.file_storage_dir
969

    
970
  @locking.ssynchronized(_config_lock, shared=1)
971
  def GetSharedFileStorageDir(self):
972
    """Get the shared file storage dir for this cluster.
973

974
    """
975
    return self._config_data.cluster.shared_file_storage_dir
976

    
977
  @locking.ssynchronized(_config_lock, shared=1)
978
  def GetHypervisorType(self):
979
    """Get the hypervisor type for this cluster.
980

981
    """
982
    return self._config_data.cluster.enabled_hypervisors[0]
983

    
984
  @locking.ssynchronized(_config_lock, shared=1)
985
  def GetHostKey(self):
986
    """Return the rsa hostkey from the config.
987

988
    @rtype: string
989
    @return: the rsa hostkey
990

991
    """
992
    return self._config_data.cluster.rsahostkeypub
993

    
994
  @locking.ssynchronized(_config_lock, shared=1)
995
  def GetDefaultIAllocator(self):
996
    """Get the default instance allocator for this cluster.
997

998
    """
999
    return self._config_data.cluster.default_iallocator
1000

    
1001
  @locking.ssynchronized(_config_lock, shared=1)
1002
  def GetPrimaryIPFamily(self):
1003
    """Get cluster primary ip family.
1004

1005
    @return: primary ip family
1006

1007
    """
1008
    return self._config_data.cluster.primary_ip_family
1009

    
1010
  @locking.ssynchronized(_config_lock, shared=1)
1011
  def GetMasterNetworkParameters(self):
1012
    """Get network parameters of the master node.
1013

1014
    @rtype: L{object.MasterNetworkParameters}
1015
    @return: network parameters of the master node
1016

1017
    """
1018
    cluster = self._config_data.cluster
1019
    result = objects.MasterNetworkParameters(name=cluster.master_node,
1020
      ip=cluster.master_ip,
1021
      netmask=cluster.master_netmask,
1022
      netdev=cluster.master_netdev,
1023
      ip_family=cluster.primary_ip_family)
1024

    
1025
    return result
1026

    
1027
  @locking.ssynchronized(_config_lock)
1028
  def AddNodeGroup(self, group, ec_id, check_uuid=True):
1029
    """Add a node group to the configuration.
1030

1031
    This method calls group.UpgradeConfig() to fill any missing attributes
1032
    according to their default values.
1033

1034
    @type group: L{objects.NodeGroup}
1035
    @param group: the NodeGroup object to add
1036
    @type ec_id: string
1037
    @param ec_id: unique id for the job to use when creating a missing UUID
1038
    @type check_uuid: bool
1039
    @param check_uuid: add an UUID to the group if it doesn't have one or, if
1040
                       it does, ensure that it does not exist in the
1041
                       configuration already
1042

1043
    """
1044
    self._UnlockedAddNodeGroup(group, ec_id, check_uuid)
1045
    self._WriteConfig()
1046

    
1047
  def _UnlockedAddNodeGroup(self, group, ec_id, check_uuid):
1048
    """Add a node group to the configuration.
1049

1050
    """
1051
    logging.info("Adding node group %s to configuration", group.name)
1052

    
1053
    # Some code might need to add a node group with a pre-populated UUID
1054
    # generated with ConfigWriter.GenerateUniqueID(). We allow them to bypass
1055
    # the "does this UUID" exist already check.
1056
    if check_uuid:
1057
      self._EnsureUUID(group, ec_id)
1058

    
1059
    try:
1060
      existing_uuid = self._UnlockedLookupNodeGroup(group.name)
1061
    except errors.OpPrereqError:
1062
      pass
1063
    else:
1064
      raise errors.OpPrereqError("Desired group name '%s' already exists as a"
1065
                                 " node group (UUID: %s)" %
1066
                                 (group.name, existing_uuid),
1067
                                 errors.ECODE_EXISTS)
1068

    
1069
    group.serial_no = 1
1070
    group.ctime = group.mtime = time.time()
1071
    group.UpgradeConfig()
1072

    
1073
    self._config_data.nodegroups[group.uuid] = group
1074
    self._config_data.cluster.serial_no += 1
1075

    
1076
  @locking.ssynchronized(_config_lock)
1077
  def RemoveNodeGroup(self, group_uuid):
1078
    """Remove a node group from the configuration.
1079

1080
    @type group_uuid: string
1081
    @param group_uuid: the UUID of the node group to remove
1082

1083
    """
1084
    logging.info("Removing node group %s from configuration", group_uuid)
1085

    
1086
    if group_uuid not in self._config_data.nodegroups:
1087
      raise errors.ConfigurationError("Unknown node group '%s'" % group_uuid)
1088

    
1089
    assert len(self._config_data.nodegroups) != 1, \
1090
            "Group '%s' is the only group, cannot be removed" % group_uuid
1091

    
1092
    del self._config_data.nodegroups[group_uuid]
1093
    self._config_data.cluster.serial_no += 1
1094
    self._WriteConfig()
1095

    
1096
  def _UnlockedLookupNodeGroup(self, target):
1097
    """Lookup a node group's UUID.
1098

1099
    @type target: string or None
1100
    @param target: group name or UUID or None to look for the default
1101
    @rtype: string
1102
    @return: nodegroup UUID
1103
    @raises errors.OpPrereqError: when the target group cannot be found
1104

1105
    """
1106
    if target is None:
1107
      if len(self._config_data.nodegroups) != 1:
1108
        raise errors.OpPrereqError("More than one node group exists. Target"
1109
                                   " group must be specified explicitely.")
1110
      else:
1111
        return self._config_data.nodegroups.keys()[0]
1112
    if target in self._config_data.nodegroups:
1113
      return target
1114
    for nodegroup in self._config_data.nodegroups.values():
1115
      if nodegroup.name == target:
1116
        return nodegroup.uuid
1117
    raise errors.OpPrereqError("Node group '%s' not found" % target,
1118
                               errors.ECODE_NOENT)
1119

    
1120
  @locking.ssynchronized(_config_lock, shared=1)
1121
  def LookupNodeGroup(self, target):
1122
    """Lookup a node group's UUID.
1123

1124
    This function is just a wrapper over L{_UnlockedLookupNodeGroup}.
1125

1126
    @type target: string or None
1127
    @param target: group name or UUID or None to look for the default
1128
    @rtype: string
1129
    @return: nodegroup UUID
1130

1131
    """
1132
    return self._UnlockedLookupNodeGroup(target)
1133

    
1134
  def _UnlockedGetNodeGroup(self, uuid):
1135
    """Lookup a node group.
1136

1137
    @type uuid: string
1138
    @param uuid: group UUID
1139
    @rtype: L{objects.NodeGroup} or None
1140
    @return: nodegroup object, or None if not found
1141

1142
    """
1143
    if uuid not in self._config_data.nodegroups:
1144
      return None
1145

    
1146
    return self._config_data.nodegroups[uuid]
1147

    
1148
  @locking.ssynchronized(_config_lock, shared=1)
1149
  def GetNodeGroup(self, uuid):
1150
    """Lookup a node group.
1151

1152
    @type uuid: string
1153
    @param uuid: group UUID
1154
    @rtype: L{objects.NodeGroup} or None
1155
    @return: nodegroup object, or None if not found
1156

1157
    """
1158
    return self._UnlockedGetNodeGroup(uuid)
1159

    
1160
  @locking.ssynchronized(_config_lock, shared=1)
1161
  def GetAllNodeGroupsInfo(self):
1162
    """Get the configuration of all node groups.
1163

1164
    """
1165
    return dict(self._config_data.nodegroups)
1166

    
1167
  @locking.ssynchronized(_config_lock, shared=1)
1168
  def GetNodeGroupList(self):
1169
    """Get a list of node groups.
1170

1171
    """
1172
    return self._config_data.nodegroups.keys()
1173

    
1174
  @locking.ssynchronized(_config_lock, shared=1)
1175
  def GetNodeGroupMembersByNodes(self, nodes):
1176
    """Get nodes which are member in the same nodegroups as the given nodes.
1177

1178
    """
1179
    ngfn = lambda node_name: self._UnlockedGetNodeInfo(node_name).group
1180
    return frozenset(member_name
1181
                     for node_name in nodes
1182
                     for member_name in
1183
                       self._UnlockedGetNodeGroup(ngfn(node_name)).members)
1184

    
1185
  @locking.ssynchronized(_config_lock)
1186
  def AddInstance(self, instance, ec_id):
1187
    """Add an instance to the config.
1188

1189
    This should be used after creating a new instance.
1190

1191
    @type instance: L{objects.Instance}
1192
    @param instance: the instance object
1193

1194
    """
1195
    if not isinstance(instance, objects.Instance):
1196
      raise errors.ProgrammerError("Invalid type passed to AddInstance")
1197

    
1198
    if instance.disk_template != constants.DT_DISKLESS:
1199
      all_lvs = instance.MapLVsByNode()
1200
      logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
1201

    
1202
    all_macs = self._AllMACs()
1203
    for nic in instance.nics:
1204
      if nic.mac in all_macs:
1205
        raise errors.ConfigurationError("Cannot add instance %s:"
1206
                                        " MAC address '%s' already in use." %
1207
                                        (instance.name, nic.mac))
1208

    
1209
    self._EnsureUUID(instance, ec_id)
1210

    
1211
    instance.serial_no = 1
1212
    instance.ctime = instance.mtime = time.time()
1213
    self._config_data.instances[instance.name] = instance
1214
    self._config_data.cluster.serial_no += 1
1215
    self._UnlockedReleaseDRBDMinors(instance.name)
1216
    self._WriteConfig()
1217

    
1218
  def _EnsureUUID(self, item, ec_id):
1219
    """Ensures a given object has a valid UUID.
1220

1221
    @param item: the instance or node to be checked
1222
    @param ec_id: the execution context id for the uuid reservation
1223

1224
    """
1225
    if not item.uuid:
1226
      item.uuid = self._GenerateUniqueID(ec_id)
1227
    elif item.uuid in self._AllIDs(include_temporary=True):
1228
      raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
1229
                                      " in use" % (item.name, item.uuid))
1230

    
1231
  def _SetInstanceStatus(self, instance_name, status):
1232
    """Set the instance's status to a given value.
1233

1234
    """
1235
    assert status in constants.ADMINST_ALL, \
1236
           "Invalid status '%s' passed to SetInstanceStatus" % (status,)
1237

    
1238
    if instance_name not in self._config_data.instances:
1239
      raise errors.ConfigurationError("Unknown instance '%s'" %
1240
                                      instance_name)
1241
    instance = self._config_data.instances[instance_name]
1242
    if instance.admin_state != status:
1243
      instance.admin_state = status
1244
      instance.serial_no += 1
1245
      instance.mtime = time.time()
1246
      self._WriteConfig()
1247

    
1248
  @locking.ssynchronized(_config_lock)
1249
  def MarkInstanceUp(self, instance_name):
1250
    """Mark the instance status to up in the config.
1251

1252
    """
1253
    self._SetInstanceStatus(instance_name, constants.ADMINST_UP)
1254

    
1255
  @locking.ssynchronized(_config_lock)
1256
  def MarkInstanceOffline(self, instance_name):
1257
    """Mark the instance status to down in the config.
1258

1259
    """
1260
    self._SetInstanceStatus(instance_name, constants.ADMINST_OFFLINE)
1261

    
1262
  @locking.ssynchronized(_config_lock)
1263
  def RemoveInstance(self, instance_name):
1264
    """Remove the instance from the configuration.
1265

1266
    """
1267
    if instance_name not in self._config_data.instances:
1268
      raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
1269

    
1270
    # If a network port has been allocated to the instance,
1271
    # return it to the pool of free ports.
1272
    inst = self._config_data.instances[instance_name]
1273
    network_port = getattr(inst, "network_port", None)
1274
    if network_port is not None:
1275
      self._config_data.cluster.tcpudp_port_pool.add(network_port)
1276

    
1277
    del self._config_data.instances[instance_name]
1278
    self._config_data.cluster.serial_no += 1
1279
    self._WriteConfig()
1280

    
1281
  @locking.ssynchronized(_config_lock)
1282
  def RenameInstance(self, old_name, new_name):
1283
    """Rename an instance.
1284

1285
    This needs to be done in ConfigWriter and not by RemoveInstance
1286
    combined with AddInstance as only we can guarantee an atomic
1287
    rename.
1288

1289
    """
1290
    if old_name not in self._config_data.instances:
1291
      raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
1292

    
1293
    # Operate on a copy to not loose instance object in case of a failure
1294
    inst = self._config_data.instances[old_name].Copy()
1295
    inst.name = new_name
1296

    
1297
    for (idx, disk) in enumerate(inst.disks):
1298
      if disk.dev_type == constants.LD_FILE:
1299
        # rename the file paths in logical and physical id
1300
        file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
1301
        disk.logical_id = (disk.logical_id[0],
1302
                           utils.PathJoin(file_storage_dir, inst.name,
1303
                                          "disk%s" % idx))
1304
        disk.physical_id = disk.logical_id
1305

    
1306
    # Actually replace instance object
1307
    del self._config_data.instances[old_name]
1308
    self._config_data.instances[inst.name] = inst
1309

    
1310
    # Force update of ssconf files
1311
    self._config_data.cluster.serial_no += 1
1312

    
1313
    self._WriteConfig()
1314

    
1315
  @locking.ssynchronized(_config_lock)
1316
  def MarkInstanceDown(self, instance_name):
1317
    """Mark the status of an instance to down in the configuration.
1318

1319
    """
1320
    self._SetInstanceStatus(instance_name, constants.ADMINST_DOWN)
1321

    
1322
  def _UnlockedGetInstanceList(self):
1323
    """Get the list of instances.
1324

1325
    This function is for internal use, when the config lock is already held.
1326

1327
    """
1328
    return self._config_data.instances.keys()
1329

    
1330
  @locking.ssynchronized(_config_lock, shared=1)
1331
  def GetInstanceList(self):
1332
    """Get the list of instances.
1333

1334
    @return: array of instances, ex. ['instance2.example.com',
1335
        'instance1.example.com']
1336

1337
    """
1338
    return self._UnlockedGetInstanceList()
1339

    
1340
  def ExpandInstanceName(self, short_name):
1341
    """Attempt to expand an incomplete instance name.
1342

1343
    """
1344
    # Locking is done in L{ConfigWriter.GetInstanceList}
1345
    return _MatchNameComponentIgnoreCase(short_name, self.GetInstanceList())
1346

    
1347
  def _UnlockedGetInstanceInfo(self, instance_name):
1348
    """Returns information about an instance.
1349

1350
    This function is for internal use, when the config lock is already held.
1351

1352
    """
1353
    if instance_name not in self._config_data.instances:
1354
      return None
1355

    
1356
    return self._config_data.instances[instance_name]
1357

    
1358
  @locking.ssynchronized(_config_lock, shared=1)
1359
  def GetInstanceInfo(self, instance_name):
1360
    """Returns information about an instance.
1361

1362
    It takes the information from the configuration file. Other information of
1363
    an instance are taken from the live systems.
1364

1365
    @param instance_name: name of the instance, e.g.
1366
        I{instance1.example.com}
1367

1368
    @rtype: L{objects.Instance}
1369
    @return: the instance object
1370

1371
    """
1372
    return self._UnlockedGetInstanceInfo(instance_name)
1373

    
1374
  @locking.ssynchronized(_config_lock, shared=1)
1375
  def GetInstanceNodeGroups(self, instance_name, primary_only=False):
1376
    """Returns set of node group UUIDs for instance's nodes.
1377

1378
    @rtype: frozenset
1379

1380
    """
1381
    instance = self._UnlockedGetInstanceInfo(instance_name)
1382
    if not instance:
1383
      raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
1384

    
1385
    if primary_only:
1386
      nodes = [instance.primary_node]
1387
    else:
1388
      nodes = instance.all_nodes
1389

    
1390
    return frozenset(self._UnlockedGetNodeInfo(node_name).group
1391
                     for node_name in nodes)
1392

    
1393
  @locking.ssynchronized(_config_lock, shared=1)
1394
  def GetMultiInstanceInfo(self, instances):
1395
    """Get the configuration of multiple instances.
1396

1397
    @param instances: list of instance names
1398
    @rtype: list
1399
    @return: list of tuples (instance, instance_info), where
1400
        instance_info is what would GetInstanceInfo return for the
1401
        node, while keeping the original order
1402

1403
    """
1404
    return [(name, self._UnlockedGetInstanceInfo(name)) for name in instances]
1405

    
1406
  @locking.ssynchronized(_config_lock, shared=1)
1407
  def GetAllInstancesInfo(self):
1408
    """Get the configuration of all instances.
1409

1410
    @rtype: dict
1411
    @return: dict of (instance, instance_info), where instance_info is what
1412
              would GetInstanceInfo return for the node
1413

1414
    """
1415
    my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
1416
                    for instance in self._UnlockedGetInstanceList()])
1417
    return my_dict
1418

    
1419
  @locking.ssynchronized(_config_lock, shared=1)
1420
  def GetInstancesInfoByFilter(self, filter_fn):
1421
    """Get instance configuration with a filter.
1422

1423
    @type filter_fn: callable
1424
    @param filter_fn: Filter function receiving instance object as parameter,
1425
      returning boolean. Important: this function is called while the
1426
      configuration locks is held. It must not do any complex work or call
1427
      functions potentially leading to a deadlock. Ideally it doesn't call any
1428
      other functions and just compares instance attributes.
1429

1430
    """
1431
    return dict((name, inst)
1432
                for (name, inst) in self._config_data.instances.items()
1433
                if filter_fn(inst))
1434

    
1435
  @locking.ssynchronized(_config_lock)
1436
  def AddNode(self, node, ec_id):
1437
    """Add a node to the configuration.
1438

1439
    @type node: L{objects.Node}
1440
    @param node: a Node instance
1441

1442
    """
1443
    logging.info("Adding node %s to configuration", node.name)
1444

    
1445
    self._EnsureUUID(node, ec_id)
1446

    
1447
    node.serial_no = 1
1448
    node.ctime = node.mtime = time.time()
1449
    self._UnlockedAddNodeToGroup(node.name, node.group)
1450
    self._config_data.nodes[node.name] = node
1451
    self._config_data.cluster.serial_no += 1
1452
    self._WriteConfig()
1453

    
1454
  @locking.ssynchronized(_config_lock)
1455
  def RemoveNode(self, node_name):
1456
    """Remove a node from the configuration.
1457

1458
    """
1459
    logging.info("Removing node %s from configuration", node_name)
1460

    
1461
    if node_name not in self._config_data.nodes:
1462
      raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1463

    
1464
    self._UnlockedRemoveNodeFromGroup(self._config_data.nodes[node_name])
1465
    del self._config_data.nodes[node_name]
1466
    self._config_data.cluster.serial_no += 1
1467
    self._WriteConfig()
1468

    
1469
  def ExpandNodeName(self, short_name):
1470
    """Attempt to expand an incomplete node name.
1471

1472
    """
1473
    # Locking is done in L{ConfigWriter.GetNodeList}
1474
    return _MatchNameComponentIgnoreCase(short_name, self.GetNodeList())
1475

    
1476
  def _UnlockedGetNodeInfo(self, node_name):
1477
    """Get the configuration of a node, as stored in the config.
1478

1479
    This function is for internal use, when the config lock is already
1480
    held.
1481

1482
    @param node_name: the node name, e.g. I{node1.example.com}
1483

1484
    @rtype: L{objects.Node}
1485
    @return: the node object
1486

1487
    """
1488
    if node_name not in self._config_data.nodes:
1489
      return None
1490

    
1491
    return self._config_data.nodes[node_name]
1492

    
1493
  @locking.ssynchronized(_config_lock, shared=1)
1494
  def GetNodeInfo(self, node_name):
1495
    """Get the configuration of a node, as stored in the config.
1496

1497
    This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1498

1499
    @param node_name: the node name, e.g. I{node1.example.com}
1500

1501
    @rtype: L{objects.Node}
1502
    @return: the node object
1503

1504
    """
1505
    return self._UnlockedGetNodeInfo(node_name)
1506

    
1507
  @locking.ssynchronized(_config_lock, shared=1)
1508
  def GetNodeInstances(self, node_name):
1509
    """Get the instances of a node, as stored in the config.
1510

1511
    @param node_name: the node name, e.g. I{node1.example.com}
1512

1513
    @rtype: (list, list)
1514
    @return: a tuple with two lists: the primary and the secondary instances
1515

1516
    """
1517
    pri = []
1518
    sec = []
1519
    for inst in self._config_data.instances.values():
1520
      if inst.primary_node == node_name:
1521
        pri.append(inst.name)
1522
      if node_name in inst.secondary_nodes:
1523
        sec.append(inst.name)
1524
    return (pri, sec)
1525

    
1526
  @locking.ssynchronized(_config_lock, shared=1)
1527
  def GetNodeGroupInstances(self, uuid, primary_only=False):
1528
    """Get the instances of a node group.
1529

1530
    @param uuid: Node group UUID
1531
    @param primary_only: Whether to only consider primary nodes
1532
    @rtype: frozenset
1533
    @return: List of instance names in node group
1534

1535
    """
1536
    if primary_only:
1537
      nodes_fn = lambda inst: [inst.primary_node]
1538
    else:
1539
      nodes_fn = lambda inst: inst.all_nodes
1540

    
1541
    return frozenset(inst.name
1542
                     for inst in self._config_data.instances.values()
1543
                     for node_name in nodes_fn(inst)
1544
                     if self._UnlockedGetNodeInfo(node_name).group == uuid)
1545

    
1546
  def _UnlockedGetNodeList(self):
1547
    """Return the list of nodes which are in the configuration.
1548

1549
    This function is for internal use, when the config lock is already
1550
    held.
1551

1552
    @rtype: list
1553

1554
    """
1555
    return self._config_data.nodes.keys()
1556

    
1557
  @locking.ssynchronized(_config_lock, shared=1)
1558
  def GetNodeList(self):
1559
    """Return the list of nodes which are in the configuration.
1560

1561
    """
1562
    return self._UnlockedGetNodeList()
1563

    
1564
  def _UnlockedGetOnlineNodeList(self):
1565
    """Return the list of nodes which are online.
1566

1567
    """
1568
    all_nodes = [self._UnlockedGetNodeInfo(node)
1569
                 for node in self._UnlockedGetNodeList()]
1570
    return [node.name for node in all_nodes if not node.offline]
1571

    
1572
  @locking.ssynchronized(_config_lock, shared=1)
1573
  def GetOnlineNodeList(self):
1574
    """Return the list of nodes which are online.
1575

1576
    """
1577
    return self._UnlockedGetOnlineNodeList()
1578

    
1579
  @locking.ssynchronized(_config_lock, shared=1)
1580
  def GetVmCapableNodeList(self):
1581
    """Return the list of nodes which are not vm capable.
1582

1583
    """
1584
    all_nodes = [self._UnlockedGetNodeInfo(node)
1585
                 for node in self._UnlockedGetNodeList()]
1586
    return [node.name for node in all_nodes if node.vm_capable]
1587

    
1588
  @locking.ssynchronized(_config_lock, shared=1)
1589
  def GetNonVmCapableNodeList(self):
1590
    """Return the list of nodes which are not vm capable.
1591

1592
    """
1593
    all_nodes = [self._UnlockedGetNodeInfo(node)
1594
                 for node in self._UnlockedGetNodeList()]
1595
    return [node.name for node in all_nodes if not node.vm_capable]
1596

    
1597
  @locking.ssynchronized(_config_lock, shared=1)
1598
  def GetMultiNodeInfo(self, nodes):
1599
    """Get the configuration of multiple nodes.
1600

1601
    @param nodes: list of node names
1602
    @rtype: list
1603
    @return: list of tuples of (node, node_info), where node_info is
1604
        what would GetNodeInfo return for the node, in the original
1605
        order
1606

1607
    """
1608
    return [(name, self._UnlockedGetNodeInfo(name)) for name in nodes]
1609

    
1610
  @locking.ssynchronized(_config_lock, shared=1)
1611
  def GetAllNodesInfo(self):
1612
    """Get the configuration of all nodes.
1613

1614
    @rtype: dict
1615
    @return: dict of (node, node_info), where node_info is what
1616
              would GetNodeInfo return for the node
1617

1618
    """
1619
    return self._UnlockedGetAllNodesInfo()
1620

    
1621
  def _UnlockedGetAllNodesInfo(self):
1622
    """Gets configuration of all nodes.
1623

1624
    @note: See L{GetAllNodesInfo}
1625

1626
    """
1627
    return dict([(node, self._UnlockedGetNodeInfo(node))
1628
                 for node in self._UnlockedGetNodeList()])
1629

    
1630
  @locking.ssynchronized(_config_lock, shared=1)
1631
  def GetNodeGroupsFromNodes(self, nodes):
1632
    """Returns groups for a list of nodes.
1633

1634
    @type nodes: list of string
1635
    @param nodes: List of node names
1636
    @rtype: frozenset
1637

1638
    """
1639
    return frozenset(self._UnlockedGetNodeInfo(name).group for name in nodes)
1640

    
1641
  def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1642
    """Get the number of current and maximum desired and possible candidates.
1643

1644
    @type exceptions: list
1645
    @param exceptions: if passed, list of nodes that should be ignored
1646
    @rtype: tuple
1647
    @return: tuple of (current, desired and possible, possible)
1648

1649
    """
1650
    mc_now = mc_should = mc_max = 0
1651
    for node in self._config_data.nodes.values():
1652
      if exceptions and node.name in exceptions:
1653
        continue
1654
      if not (node.offline or node.drained) and node.master_capable:
1655
        mc_max += 1
1656
      if node.master_candidate:
1657
        mc_now += 1
1658
    mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1659
    return (mc_now, mc_should, mc_max)
1660

    
1661
  @locking.ssynchronized(_config_lock, shared=1)
1662
  def GetMasterCandidateStats(self, exceptions=None):
1663
    """Get the number of current and maximum possible candidates.
1664

1665
    This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1666

1667
    @type exceptions: list
1668
    @param exceptions: if passed, list of nodes that should be ignored
1669
    @rtype: tuple
1670
    @return: tuple of (current, max)
1671

1672
    """
1673
    return self._UnlockedGetMasterCandidateStats(exceptions)
1674

    
1675
  @locking.ssynchronized(_config_lock)
1676
  def MaintainCandidatePool(self, exceptions):
1677
    """Try to grow the candidate pool to the desired size.
1678

1679
    @type exceptions: list
1680
    @param exceptions: if passed, list of nodes that should be ignored
1681
    @rtype: list
1682
    @return: list with the adjusted nodes (L{objects.Node} instances)
1683

1684
    """
1685
    mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1686
    mod_list = []
1687
    if mc_now < mc_max:
1688
      node_list = self._config_data.nodes.keys()
1689
      random.shuffle(node_list)
1690
      for name in node_list:
1691
        if mc_now >= mc_max:
1692
          break
1693
        node = self._config_data.nodes[name]
1694
        if (node.master_candidate or node.offline or node.drained or
1695
            node.name in exceptions or not node.master_capable):
1696
          continue
1697
        mod_list.append(node)
1698
        node.master_candidate = True
1699
        node.serial_no += 1
1700
        mc_now += 1
1701
      if mc_now != mc_max:
1702
        # this should not happen
1703
        logging.warning("Warning: MaintainCandidatePool didn't manage to"
1704
                        " fill the candidate pool (%d/%d)", mc_now, mc_max)
1705
      if mod_list:
1706
        self._config_data.cluster.serial_no += 1
1707
        self._WriteConfig()
1708

    
1709
    return mod_list
1710

    
1711
  def _UnlockedAddNodeToGroup(self, node_name, nodegroup_uuid):
1712
    """Add a given node to the specified group.
1713

1714
    """
1715
    if nodegroup_uuid not in self._config_data.nodegroups:
1716
      # This can happen if a node group gets deleted between its lookup and
1717
      # when we're adding the first node to it, since we don't keep a lock in
1718
      # the meantime. It's ok though, as we'll fail cleanly if the node group
1719
      # is not found anymore.
1720
      raise errors.OpExecError("Unknown node group: %s" % nodegroup_uuid)
1721
    if node_name not in self._config_data.nodegroups[nodegroup_uuid].members:
1722
      self._config_data.nodegroups[nodegroup_uuid].members.append(node_name)
1723

    
1724
  def _UnlockedRemoveNodeFromGroup(self, node):
1725
    """Remove a given node from its group.
1726

1727
    """
1728
    nodegroup = node.group
1729
    if nodegroup not in self._config_data.nodegroups:
1730
      logging.warning("Warning: node '%s' has unknown node group '%s'"
1731
                      " (while being removed from it)", node.name, nodegroup)
1732
    nodegroup_obj = self._config_data.nodegroups[nodegroup]
1733
    if node.name not in nodegroup_obj.members:
1734
      logging.warning("Warning: node '%s' not a member of its node group '%s'"
1735
                      " (while being removed from it)", node.name, nodegroup)
1736
    else:
1737
      nodegroup_obj.members.remove(node.name)
1738

    
1739
  @locking.ssynchronized(_config_lock)
1740
  def AssignGroupNodes(self, mods):
1741
    """Changes the group of a number of nodes.
1742

1743
    @type mods: list of tuples; (node name, new group UUID)
1744
    @param mods: Node membership modifications
1745

1746
    """
1747
    groups = self._config_data.nodegroups
1748
    nodes = self._config_data.nodes
1749

    
1750
    resmod = []
1751

    
1752
    # Try to resolve names/UUIDs first
1753
    for (node_name, new_group_uuid) in mods:
1754
      try:
1755
        node = nodes[node_name]
1756
      except KeyError:
1757
        raise errors.ConfigurationError("Unable to find node '%s'" % node_name)
1758

    
1759
      if node.group == new_group_uuid:
1760
        # Node is being assigned to its current group
1761
        logging.debug("Node '%s' was assigned to its current group (%s)",
1762
                      node_name, node.group)
1763
        continue
1764

    
1765
      # Try to find current group of node
1766
      try:
1767
        old_group = groups[node.group]
1768
      except KeyError:
1769
        raise errors.ConfigurationError("Unable to find old group '%s'" %
1770
                                        node.group)
1771

    
1772
      # Try to find new group for node
1773
      try:
1774
        new_group = groups[new_group_uuid]
1775
      except KeyError:
1776
        raise errors.ConfigurationError("Unable to find new group '%s'" %
1777
                                        new_group_uuid)
1778

    
1779
      assert node.name in old_group.members, \
1780
        ("Inconsistent configuration: node '%s' not listed in members for its"
1781
         " old group '%s'" % (node.name, old_group.uuid))
1782
      assert node.name not in new_group.members, \
1783
        ("Inconsistent configuration: node '%s' already listed in members for"
1784
         " its new group '%s'" % (node.name, new_group.uuid))
1785

    
1786
      resmod.append((node, old_group, new_group))
1787

    
1788
    # Apply changes
1789
    for (node, old_group, new_group) in resmod:
1790
      assert node.uuid != new_group.uuid and old_group.uuid != new_group.uuid, \
1791
        "Assigning to current group is not possible"
1792

    
1793
      node.group = new_group.uuid
1794

    
1795
      # Update members of involved groups
1796
      if node.name in old_group.members:
1797
        old_group.members.remove(node.name)
1798
      if node.name not in new_group.members:
1799
        new_group.members.append(node.name)
1800

    
1801
    # Update timestamps and serials (only once per node/group object)
1802
    now = time.time()
1803
    for obj in frozenset(itertools.chain(*resmod)): # pylint: disable=W0142
1804
      obj.serial_no += 1
1805
      obj.mtime = now
1806

    
1807
    # Force ssconf update
1808
    self._config_data.cluster.serial_no += 1
1809

    
1810
    self._WriteConfig()
1811

    
1812
  def _BumpSerialNo(self):
1813
    """Bump up the serial number of the config.
1814

1815
    """
1816
    self._config_data.serial_no += 1
1817
    self._config_data.mtime = time.time()
1818

    
1819
  def _AllUUIDObjects(self):
1820
    """Returns all objects with uuid attributes.
1821

1822
    """
1823
    return (self._config_data.instances.values() +
1824
            self._config_data.nodes.values() +
1825
            self._config_data.nodegroups.values() +
1826
            [self._config_data.cluster])
1827

    
1828
  def _OpenConfig(self, accept_foreign):
1829
    """Read the config data from disk.
1830

1831
    """
1832
    raw_data = utils.ReadFile(self._cfg_file)
1833

    
1834
    try:
1835
      data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1836
    except Exception, err:
1837
      raise errors.ConfigurationError(err)
1838

    
1839
    # Make sure the configuration has the right version
1840
    _ValidateConfig(data)
1841

    
1842
    if (not hasattr(data, 'cluster') or
1843
        not hasattr(data.cluster, 'rsahostkeypub')):
1844
      raise errors.ConfigurationError("Incomplete configuration"
1845
                                      " (missing cluster.rsahostkeypub)")
1846

    
1847
    if data.cluster.master_node != self._my_hostname and not accept_foreign:
1848
      msg = ("The configuration denotes node %s as master, while my"
1849
             " hostname is %s; opening a foreign configuration is only"
1850
             " possible in accept_foreign mode" %
1851
             (data.cluster.master_node, self._my_hostname))
1852
      raise errors.ConfigurationError(msg)
1853

    
1854
    # Upgrade configuration if needed
1855
    data.UpgradeConfig()
1856

    
1857
    self._config_data = data
1858
    # reset the last serial as -1 so that the next write will cause
1859
    # ssconf update
1860
    self._last_cluster_serial = -1
1861

    
1862
    # And finally run our (custom) config upgrade sequence
1863
    self._UpgradeConfig()
1864

    
1865
    self._cfg_id = utils.GetFileID(path=self._cfg_file)
1866

    
1867
  def _UpgradeConfig(self):
1868
    """Run upgrade steps that cannot be done purely in the objects.
1869

1870
    This is because some data elements need uniqueness across the
1871
    whole configuration, etc.
1872

1873
    @warning: this function will call L{_WriteConfig()}, but also
1874
        L{DropECReservations} so it needs to be called only from a
1875
        "safe" place (the constructor). If one wanted to call it with
1876
        the lock held, a DropECReservationUnlocked would need to be
1877
        created first, to avoid causing deadlock.
1878

1879
    """
1880
    modified = False
1881
    for item in self._AllUUIDObjects():
1882
      if item.uuid is None:
1883
        item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1884
        modified = True
1885
    if not self._config_data.nodegroups:
1886
      default_nodegroup_name = constants.INITIAL_NODE_GROUP_NAME
1887
      default_nodegroup = objects.NodeGroup(name=default_nodegroup_name,
1888
                                            members=[])
1889
      self._UnlockedAddNodeGroup(default_nodegroup, _UPGRADE_CONFIG_JID, True)
1890
      modified = True
1891
    for node in self._config_data.nodes.values():
1892
      if not node.group:
1893
        node.group = self.LookupNodeGroup(None)
1894
        modified = True
1895
      # This is technically *not* an upgrade, but needs to be done both when
1896
      # nodegroups are being added, and upon normally loading the config,
1897
      # because the members list of a node group is discarded upon
1898
      # serializing/deserializing the object.
1899
      self._UnlockedAddNodeToGroup(node.name, node.group)
1900
    if modified:
1901
      self._WriteConfig()
1902
      # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1903
      # only called at config init time, without the lock held
1904
      self.DropECReservations(_UPGRADE_CONFIG_JID)
1905

    
1906
  def _DistributeConfig(self, feedback_fn):
1907
    """Distribute the configuration to the other nodes.
1908

1909
    Currently, this only copies the configuration file. In the future,
1910
    it could be used to encapsulate the 2/3-phase update mechanism.
1911

1912
    """
1913
    if self._offline:
1914
      return True
1915

    
1916
    bad = False
1917

    
1918
    node_list = []
1919
    addr_list = []
1920
    myhostname = self._my_hostname
1921
    # we can skip checking whether _UnlockedGetNodeInfo returns None
1922
    # since the node list comes from _UnlocketGetNodeList, and we are
1923
    # called with the lock held, so no modifications should take place
1924
    # in between
1925
    for node_name in self._UnlockedGetNodeList():
1926
      if node_name == myhostname:
1927
        continue
1928
      node_info = self._UnlockedGetNodeInfo(node_name)
1929
      if not node_info.master_candidate:
1930
        continue
1931
      node_list.append(node_info.name)
1932
      addr_list.append(node_info.primary_ip)
1933

    
1934
    # TODO: Use dedicated resolver talking to config writer for name resolution
1935
    result = \
1936
      self._GetRpc(addr_list).call_upload_file(node_list, self._cfg_file)
1937
    for to_node, to_result in result.items():
1938
      msg = to_result.fail_msg
1939
      if msg:
1940
        msg = ("Copy of file %s to node %s failed: %s" %
1941
               (self._cfg_file, to_node, msg))
1942
        logging.error(msg)
1943

    
1944
        if feedback_fn:
1945
          feedback_fn(msg)
1946

    
1947
        bad = True
1948

    
1949
    return not bad
1950

    
1951
  def _WriteConfig(self, destination=None, feedback_fn=None):
1952
    """Write the configuration data to persistent storage.
1953

1954
    """
1955
    assert feedback_fn is None or callable(feedback_fn)
1956

    
1957
    # Warn on config errors, but don't abort the save - the
1958
    # configuration has already been modified, and we can't revert;
1959
    # the best we can do is to warn the user and save as is, leaving
1960
    # recovery to the user
1961
    config_errors = self._UnlockedVerifyConfig()
1962
    if config_errors:
1963
      errmsg = ("Configuration data is not consistent: %s" %
1964
                (utils.CommaJoin(config_errors)))
1965
      logging.critical(errmsg)
1966
      if feedback_fn:
1967
        feedback_fn(errmsg)
1968

    
1969
    if destination is None:
1970
      destination = self._cfg_file
1971
    self._BumpSerialNo()
1972
    txt = serializer.Dump(self._config_data.ToDict())
1973

    
1974
    getents = self._getents()
1975
    try:
1976
      fd = utils.SafeWriteFile(destination, self._cfg_id, data=txt,
1977
                               close=False, gid=getents.confd_gid, mode=0640)
1978
    except errors.LockError:
1979
      raise errors.ConfigurationError("The configuration file has been"
1980
                                      " modified since the last write, cannot"
1981
                                      " update")
1982
    try:
1983
      self._cfg_id = utils.GetFileID(fd=fd)
1984
    finally:
1985
      os.close(fd)
1986

    
1987
    self.write_count += 1
1988

    
1989
    # and redistribute the config file to master candidates
1990
    self._DistributeConfig(feedback_fn)
1991

    
1992
    # Write ssconf files on all nodes (including locally)
1993
    if self._last_cluster_serial < self._config_data.cluster.serial_no:
1994
      if not self._offline:
1995
        result = self._GetRpc(None).call_write_ssconf_files(
1996
          self._UnlockedGetOnlineNodeList(),
1997
          self._UnlockedGetSsconfValues())
1998

    
1999
        for nname, nresu in result.items():
2000
          msg = nresu.fail_msg
2001
          if msg:
2002
            errmsg = ("Error while uploading ssconf files to"
2003
                      " node %s: %s" % (nname, msg))
2004
            logging.warning(errmsg)
2005

    
2006
            if feedback_fn:
2007
              feedback_fn(errmsg)
2008

    
2009
      self._last_cluster_serial = self._config_data.cluster.serial_no
2010

    
2011
  def _UnlockedGetSsconfValues(self):
2012
    """Return the values needed by ssconf.
2013

2014
    @rtype: dict
2015
    @return: a dictionary with keys the ssconf names and values their
2016
        associated value
2017

2018
    """
2019
    fn = "\n".join
2020
    instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
2021
    node_names = utils.NiceSort(self._UnlockedGetNodeList())
2022
    node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
2023
    node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
2024
                    for ninfo in node_info]
2025
    node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
2026
                    for ninfo in node_info]
2027

    
2028
    instance_data = fn(instance_names)
2029
    off_data = fn(node.name for node in node_info if node.offline)
2030
    on_data = fn(node.name for node in node_info if not node.offline)
2031
    mc_data = fn(node.name for node in node_info if node.master_candidate)
2032
    mc_ips_data = fn(node.primary_ip for node in node_info
2033
                     if node.master_candidate)
2034
    node_data = fn(node_names)
2035
    node_pri_ips_data = fn(node_pri_ips)
2036
    node_snd_ips_data = fn(node_snd_ips)
2037

    
2038
    cluster = self._config_data.cluster
2039
    cluster_tags = fn(cluster.GetTags())
2040

    
2041
    hypervisor_list = fn(cluster.enabled_hypervisors)
2042

    
2043
    uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n")
2044

    
2045
    nodegroups = ["%s %s" % (nodegroup.uuid, nodegroup.name) for nodegroup in
2046
                  self._config_data.nodegroups.values()]
2047
    nodegroups_data = fn(utils.NiceSort(nodegroups))
2048

    
2049
    ssconf_values = {
2050
      constants.SS_CLUSTER_NAME: cluster.cluster_name,
2051
      constants.SS_CLUSTER_TAGS: cluster_tags,
2052
      constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
2053
      constants.SS_SHARED_FILE_STORAGE_DIR: cluster.shared_file_storage_dir,
2054
      constants.SS_MASTER_CANDIDATES: mc_data,
2055
      constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
2056
      constants.SS_MASTER_IP: cluster.master_ip,
2057
      constants.SS_MASTER_NETDEV: cluster.master_netdev,
2058
      constants.SS_MASTER_NETMASK: str(cluster.master_netmask),
2059
      constants.SS_MASTER_NODE: cluster.master_node,
2060
      constants.SS_NODE_LIST: node_data,
2061
      constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
2062
      constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
2063
      constants.SS_OFFLINE_NODES: off_data,
2064
      constants.SS_ONLINE_NODES: on_data,
2065
      constants.SS_PRIMARY_IP_FAMILY: str(cluster.primary_ip_family),
2066
      constants.SS_INSTANCE_LIST: instance_data,
2067
      constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
2068
      constants.SS_HYPERVISOR_LIST: hypervisor_list,
2069
      constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
2070
      constants.SS_UID_POOL: uid_pool,
2071
      constants.SS_NODEGROUPS: nodegroups_data,
2072
      }
2073
    bad_values = [(k, v) for k, v in ssconf_values.items()
2074
                  if not isinstance(v, (str, basestring))]
2075
    if bad_values:
2076
      err = utils.CommaJoin("%s=%s" % (k, v) for k, v in bad_values)
2077
      raise errors.ConfigurationError("Some ssconf key(s) have non-string"
2078
                                      " values: %s" % err)
2079
    return ssconf_values
2080

    
2081
  @locking.ssynchronized(_config_lock, shared=1)
2082
  def GetSsconfValues(self):
2083
    """Wrapper using lock around _UnlockedGetSsconf().
2084

2085
    """
2086
    return self._UnlockedGetSsconfValues()
2087

    
2088
  @locking.ssynchronized(_config_lock, shared=1)
2089
  def GetVGName(self):
2090
    """Return the volume group name.
2091

2092
    """
2093
    return self._config_data.cluster.volume_group_name
2094

    
2095
  @locking.ssynchronized(_config_lock)
2096
  def SetVGName(self, vg_name):
2097
    """Set the volume group name.
2098

2099
    """
2100
    self._config_data.cluster.volume_group_name = vg_name
2101
    self._config_data.cluster.serial_no += 1
2102
    self._WriteConfig()
2103

    
2104
  @locking.ssynchronized(_config_lock, shared=1)
2105
  def GetDRBDHelper(self):
2106
    """Return DRBD usermode helper.
2107

2108
    """
2109
    return self._config_data.cluster.drbd_usermode_helper
2110

    
2111
  @locking.ssynchronized(_config_lock)
2112
  def SetDRBDHelper(self, drbd_helper):
2113
    """Set DRBD usermode helper.
2114

2115
    """
2116
    self._config_data.cluster.drbd_usermode_helper = drbd_helper
2117
    self._config_data.cluster.serial_no += 1
2118
    self._WriteConfig()
2119

    
2120
  @locking.ssynchronized(_config_lock, shared=1)
2121
  def GetMACPrefix(self):
2122
    """Return the mac prefix.
2123

2124
    """
2125
    return self._config_data.cluster.mac_prefix
2126

    
2127
  @locking.ssynchronized(_config_lock, shared=1)
2128
  def GetClusterInfo(self):
2129
    """Returns information about the cluster
2130

2131
    @rtype: L{objects.Cluster}
2132
    @return: the cluster object
2133

2134
    """
2135
    return self._config_data.cluster
2136

    
2137
  @locking.ssynchronized(_config_lock, shared=1)
2138
  def HasAnyDiskOfType(self, dev_type):
2139
    """Check if in there is at disk of the given type in the configuration.
2140

2141
    """
2142
    return self._config_data.HasAnyDiskOfType(dev_type)
2143

    
2144
  @locking.ssynchronized(_config_lock)
2145
  def Update(self, target, feedback_fn):
2146
    """Notify function to be called after updates.
2147

2148
    This function must be called when an object (as returned by
2149
    GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
2150
    caller wants the modifications saved to the backing store. Note
2151
    that all modified objects will be saved, but the target argument
2152
    is the one the caller wants to ensure that it's saved.
2153

2154
    @param target: an instance of either L{objects.Cluster},
2155
        L{objects.Node} or L{objects.Instance} which is existing in
2156
        the cluster
2157
    @param feedback_fn: Callable feedback function
2158

2159
    """
2160
    if self._config_data is None:
2161
      raise errors.ProgrammerError("Configuration file not read,"
2162
                                   " cannot save.")
2163
    update_serial = False
2164
    if isinstance(target, objects.Cluster):
2165
      test = target == self._config_data.cluster
2166
    elif isinstance(target, objects.Node):
2167
      test = target in self._config_data.nodes.values()
2168
      update_serial = True
2169
    elif isinstance(target, objects.Instance):
2170
      test = target in self._config_data.instances.values()
2171
    elif isinstance(target, objects.NodeGroup):
2172
      test = target in self._config_data.nodegroups.values()
2173
    else:
2174
      raise errors.ProgrammerError("Invalid object type (%s) passed to"
2175
                                   " ConfigWriter.Update" % type(target))
2176
    if not test:
2177
      raise errors.ConfigurationError("Configuration updated since object"
2178
                                      " has been read or unknown object")
2179
    target.serial_no += 1
2180
    target.mtime = now = time.time()
2181

    
2182
    if update_serial:
2183
      # for node updates, we need to increase the cluster serial too
2184
      self._config_data.cluster.serial_no += 1
2185
      self._config_data.cluster.mtime = now
2186

    
2187
    if isinstance(target, objects.Instance):
2188
      self._UnlockedReleaseDRBDMinors(target.name)
2189

    
2190
    self._WriteConfig(feedback_fn=feedback_fn)
2191

    
2192
  @locking.ssynchronized(_config_lock)
2193
  def DropECReservations(self, ec_id):
2194
    """Drop per-execution-context reservations
2195

2196
    """
2197
    for rm in self._all_rms:
2198
      rm.DropECReservations(ec_id)