Statistics
| Branch: | Tag: | Revision:

root / lib / config.py @ ea642319

History | View | Annotate | Download (71.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Configuration management for Ganeti
23

24
This module provides the interface to the Ganeti cluster configuration.
25

26
The configuration data is stored on every node but is updated on the master
27
only. After each update, the master distributes the data to the other nodes.
28

29
Currently, the data storage format is JSON. YAML was slow and consuming too
30
much memory.
31

32
"""
33

    
34
# pylint: disable=R0904
35
# R0904: Too many public methods
36

    
37
import os
38
import random
39
import logging
40
import time
41
import itertools
42

    
43
from ganeti import errors
44
from ganeti import locking
45
from ganeti import utils
46
from ganeti import constants
47
from ganeti import rpc
48
from ganeti import objects
49
from ganeti import serializer
50
from ganeti import uidpool
51
from ganeti import netutils
52
from ganeti import runtime
53

    
54

    
55
_config_lock = locking.SharedLock("ConfigWriter")
56

    
57
# job id used for resource management at config upgrade time
58
_UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
59

    
60

    
61
def _ValidateConfig(data):
62
  """Verifies that a configuration objects looks valid.
63

64
  This only verifies the version of the configuration.
65

66
  @raise errors.ConfigurationError: if the version differs from what
67
      we expect
68

69
  """
70
  if data.version != constants.CONFIG_VERSION:
71
    raise errors.ConfigVersionMismatch(constants.CONFIG_VERSION, data.version)
72

    
73

    
74
class TemporaryReservationManager:
75
  """A temporary resource reservation manager.
76

77
  This is used to reserve resources in a job, before using them, making sure
78
  other jobs cannot get them in the meantime.
79

80
  """
81
  def __init__(self):
82
    self._ec_reserved = {}
83

    
84
  def Reserved(self, resource):
85
    for holder_reserved in self._ec_reserved.values():
86
      if resource in holder_reserved:
87
        return True
88
    return False
89

    
90
  def Reserve(self, ec_id, resource):
91
    if self.Reserved(resource):
92
      raise errors.ReservationError("Duplicate reservation for resource '%s'"
93
                                    % str(resource))
94
    if ec_id not in self._ec_reserved:
95
      self._ec_reserved[ec_id] = set([resource])
96
    else:
97
      self._ec_reserved[ec_id].add(resource)
98

    
99
  def DropECReservations(self, ec_id):
100
    if ec_id in self._ec_reserved:
101
      del self._ec_reserved[ec_id]
102

    
103
  def GetReserved(self):
104
    all_reserved = set()
105
    for holder_reserved in self._ec_reserved.values():
106
      all_reserved.update(holder_reserved)
107
    return all_reserved
108

    
109
  def Generate(self, existing, generate_one_fn, ec_id):
110
    """Generate a new resource of this type
111

112
    """
113
    assert callable(generate_one_fn)
114

    
115
    all_elems = self.GetReserved()
116
    all_elems.update(existing)
117
    retries = 64
118
    while retries > 0:
119
      new_resource = generate_one_fn()
120
      if new_resource is not None and new_resource not in all_elems:
121
        break
122
    else:
123
      raise errors.ConfigurationError("Not able generate new resource"
124
                                      " (last tried: %s)" % new_resource)
125
    self.Reserve(ec_id, new_resource)
126
    return new_resource
127

    
128

    
129
def _MatchNameComponentIgnoreCase(short_name, names):
130
  """Wrapper around L{utils.text.MatchNameComponent}.
131

132
  """
133
  return utils.MatchNameComponent(short_name, names, case_sensitive=False)
134

    
135

    
136
class ConfigWriter:
137
  """The interface to the cluster configuration.
138

139
  @ivar _temporary_lvs: reservation manager for temporary LVs
140
  @ivar _all_rms: a list of all temporary reservation managers
141

142
  """
143
  def __init__(self, cfg_file=None, offline=False, _getents=runtime.GetEnts,
144
               accept_foreign=False):
145
    self.write_count = 0
146
    self._lock = _config_lock
147
    self._config_data = None
148
    self._offline = offline
149
    if cfg_file is None:
150
      self._cfg_file = constants.CLUSTER_CONF_FILE
151
    else:
152
      self._cfg_file = cfg_file
153
    self._getents = _getents
154
    self._temporary_ids = TemporaryReservationManager()
155
    self._temporary_drbds = {}
156
    self._temporary_macs = TemporaryReservationManager()
157
    self._temporary_secrets = TemporaryReservationManager()
158
    self._temporary_lvs = TemporaryReservationManager()
159
    self._all_rms = [self._temporary_ids, self._temporary_macs,
160
                     self._temporary_secrets, self._temporary_lvs]
161
    # Note: in order to prevent errors when resolving our name in
162
    # _DistributeConfig, we compute it here once and reuse it; it's
163
    # better to raise an error before starting to modify the config
164
    # file than after it was modified
165
    self._my_hostname = netutils.Hostname.GetSysName()
166
    self._last_cluster_serial = -1
167
    self._cfg_id = None
168
    self._context = None
169
    self._OpenConfig(accept_foreign)
170

    
171
  def _GetRpc(self, address_list):
172
    """Returns RPC runner for configuration.
173

174
    """
175
    return rpc.ConfigRunner(self._context, address_list)
176

    
177
  def SetContext(self, context):
178
    """Sets Ganeti context.
179

180
    """
181
    self._context = context
182

    
183
  # this method needs to be static, so that we can call it on the class
184
  @staticmethod
185
  def IsCluster():
186
    """Check if the cluster is configured.
187

188
    """
189
    return os.path.exists(constants.CLUSTER_CONF_FILE)
190

    
191
  def _GenerateOneMAC(self):
192
    """Generate one mac address
193

194
    """
195
    prefix = self._config_data.cluster.mac_prefix
196
    byte1 = random.randrange(0, 256)
197
    byte2 = random.randrange(0, 256)
198
    byte3 = random.randrange(0, 256)
199
    mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
200
    return mac
201

    
202
  @locking.ssynchronized(_config_lock, shared=1)
203
  def GetNdParams(self, node):
204
    """Get the node params populated with cluster defaults.
205

206
    @type node: L{objects.Node}
207
    @param node: The node we want to know the params for
208
    @return: A dict with the filled in node params
209

210
    """
211
    nodegroup = self._UnlockedGetNodeGroup(node.group)
212
    return self._config_data.cluster.FillND(node, nodegroup)
213

    
214
  @locking.ssynchronized(_config_lock, shared=1)
215
  def GenerateMAC(self, ec_id):
216
    """Generate a MAC for an instance.
217

218
    This should check the current instances for duplicates.
219

220
    """
221
    existing = self._AllMACs()
222
    return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
223

    
224
  @locking.ssynchronized(_config_lock, shared=1)
225
  def ReserveMAC(self, mac, ec_id):
226
    """Reserve a MAC for an instance.
227

228
    This only checks instances managed by this cluster, it does not
229
    check for potential collisions elsewhere.
230

231
    """
232
    all_macs = self._AllMACs()
233
    if mac in all_macs:
234
      raise errors.ReservationError("mac already in use")
235
    else:
236
      self._temporary_macs.Reserve(ec_id, mac)
237

    
238
  @locking.ssynchronized(_config_lock, shared=1)
239
  def ReserveLV(self, lv_name, ec_id):
240
    """Reserve an VG/LV pair for an instance.
241

242
    @type lv_name: string
243
    @param lv_name: the logical volume name to reserve
244

245
    """
246
    all_lvs = self._AllLVs()
247
    if lv_name in all_lvs:
248
      raise errors.ReservationError("LV already in use")
249
    else:
250
      self._temporary_lvs.Reserve(ec_id, lv_name)
251

    
252
  @locking.ssynchronized(_config_lock, shared=1)
253
  def GenerateDRBDSecret(self, ec_id):
254
    """Generate a DRBD secret.
255

256
    This checks the current disks for duplicates.
257

258
    """
259
    return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
260
                                            utils.GenerateSecret,
261
                                            ec_id)
262

    
263
  def _AllLVs(self):
264
    """Compute the list of all LVs.
265

266
    """
267
    lvnames = set()
268
    for instance in self._config_data.instances.values():
269
      node_data = instance.MapLVsByNode()
270
      for lv_list in node_data.values():
271
        lvnames.update(lv_list)
272
    return lvnames
273

    
274
  def _AllIDs(self, include_temporary):
275
    """Compute the list of all UUIDs and names we have.
276

277
    @type include_temporary: boolean
278
    @param include_temporary: whether to include the _temporary_ids set
279
    @rtype: set
280
    @return: a set of IDs
281

282
    """
283
    existing = set()
284
    if include_temporary:
285
      existing.update(self._temporary_ids.GetReserved())
286
    existing.update(self._AllLVs())
287
    existing.update(self._config_data.instances.keys())
288
    existing.update(self._config_data.nodes.keys())
289
    existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
290
    return existing
291

    
292
  def _GenerateUniqueID(self, ec_id):
293
    """Generate an unique UUID.
294

295
    This checks the current node, instances and disk names for
296
    duplicates.
297

298
    @rtype: string
299
    @return: the unique id
300

301
    """
302
    existing = self._AllIDs(include_temporary=False)
303
    return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
304

    
305
  @locking.ssynchronized(_config_lock, shared=1)
306
  def GenerateUniqueID(self, ec_id):
307
    """Generate an unique ID.
308

309
    This is just a wrapper over the unlocked version.
310

311
    @type ec_id: string
312
    @param ec_id: unique id for the job to reserve the id to
313

314
    """
315
    return self._GenerateUniqueID(ec_id)
316

    
317
  def _AllMACs(self):
318
    """Return all MACs present in the config.
319

320
    @rtype: list
321
    @return: the list of all MACs
322

323
    """
324
    result = []
325
    for instance in self._config_data.instances.values():
326
      for nic in instance.nics:
327
        result.append(nic.mac)
328

    
329
    return result
330

    
331
  def _AllDRBDSecrets(self):
332
    """Return all DRBD secrets present in the config.
333

334
    @rtype: list
335
    @return: the list of all DRBD secrets
336

337
    """
338
    def helper(disk, result):
339
      """Recursively gather secrets from this disk."""
340
      if disk.dev_type == constants.DT_DRBD8:
341
        result.append(disk.logical_id[5])
342
      if disk.children:
343
        for child in disk.children:
344
          helper(child, result)
345

    
346
    result = []
347
    for instance in self._config_data.instances.values():
348
      for disk in instance.disks:
349
        helper(disk, result)
350

    
351
    return result
352

    
353
  def _CheckDiskIDs(self, disk, l_ids, p_ids):
354
    """Compute duplicate disk IDs
355

356
    @type disk: L{objects.Disk}
357
    @param disk: the disk at which to start searching
358
    @type l_ids: list
359
    @param l_ids: list of current logical ids
360
    @type p_ids: list
361
    @param p_ids: list of current physical ids
362
    @rtype: list
363
    @return: a list of error messages
364

365
    """
366
    result = []
367
    if disk.logical_id is not None:
368
      if disk.logical_id in l_ids:
369
        result.append("duplicate logical id %s" % str(disk.logical_id))
370
      else:
371
        l_ids.append(disk.logical_id)
372
    if disk.physical_id is not None:
373
      if disk.physical_id in p_ids:
374
        result.append("duplicate physical id %s" % str(disk.physical_id))
375
      else:
376
        p_ids.append(disk.physical_id)
377

    
378
    if disk.children:
379
      for child in disk.children:
380
        result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
381
    return result
382

    
383
  def _UnlockedVerifyConfig(self):
384
    """Verify function.
385

386
    @rtype: list
387
    @return: a list of error messages; a non-empty list signifies
388
        configuration errors
389

390
    """
391
    # pylint: disable=R0914
392
    result = []
393
    seen_macs = []
394
    ports = {}
395
    data = self._config_data
396
    cluster = data.cluster
397
    seen_lids = []
398
    seen_pids = []
399

    
400
    # global cluster checks
401
    if not cluster.enabled_hypervisors:
402
      result.append("enabled hypervisors list doesn't have any entries")
403
    invalid_hvs = set(cluster.enabled_hypervisors) - constants.HYPER_TYPES
404
    if invalid_hvs:
405
      result.append("enabled hypervisors contains invalid entries: %s" %
406
                    invalid_hvs)
407
    missing_hvp = (set(cluster.enabled_hypervisors) -
408
                   set(cluster.hvparams.keys()))
409
    if missing_hvp:
410
      result.append("hypervisor parameters missing for the enabled"
411
                    " hypervisor(s) %s" % utils.CommaJoin(missing_hvp))
412

    
413
    if cluster.master_node not in data.nodes:
414
      result.append("cluster has invalid primary node '%s'" %
415
                    cluster.master_node)
416

    
417
    def _helper(owner, attr, value, template):
418
      try:
419
        utils.ForceDictType(value, template)
420
      except errors.GenericError, err:
421
        result.append("%s has invalid %s: %s" % (owner, attr, err))
422

    
423
    def _helper_nic(owner, params):
424
      try:
425
        objects.NIC.CheckParameterSyntax(params)
426
      except errors.ConfigurationError, err:
427
        result.append("%s has invalid nicparams: %s" % (owner, err))
428

    
429
    def _helper_ipolicy(owner, params):
430
      try:
431
        objects.InstancePolicy.CheckParameterSyntax(params)
432
      except errors.ConfigurationError, err:
433
        result.append("%s has invalid instance policy: %s" % (owner, err))
434

    
435
    def _helper_ispecs(owner, params):
436
      for key, value in params.items():
437
        if key in constants.IPOLICY_ISPECS:
438
          fullkey = "ipolicy/" + key
439
          _helper(owner, fullkey, value, constants.ISPECS_PARAMETER_TYPES)
440
        else:
441
          # FIXME: assuming list type
442
          if key in constants.IPOLICY_PARAMETERS:
443
            exp_type = float
444
          else:
445
            exp_type = list
446
          if not isinstance(value, exp_type):
447
            result.append("%s has invalid instance policy: for %s,"
448
                          " expecting %s, got %s" %
449
                          (owner, key, exp_type.__name__, type(value)))
450

    
451
    # check cluster parameters
452
    _helper("cluster", "beparams", cluster.SimpleFillBE({}),
453
            constants.BES_PARAMETER_TYPES)
454
    _helper("cluster", "nicparams", cluster.SimpleFillNIC({}),
455
            constants.NICS_PARAMETER_TYPES)
456
    _helper_nic("cluster", cluster.SimpleFillNIC({}))
457
    _helper("cluster", "ndparams", cluster.SimpleFillND({}),
458
            constants.NDS_PARAMETER_TYPES)
459
    _helper_ipolicy("cluster", cluster.SimpleFillIPolicy({}))
460
    _helper_ispecs("cluster", cluster.SimpleFillIPolicy({}))
461

    
462
    # per-instance checks
463
    for instance_name in data.instances:
464
      instance = data.instances[instance_name]
465
      if instance.name != instance_name:
466
        result.append("instance '%s' is indexed by wrong name '%s'" %
467
                      (instance.name, instance_name))
468
      if instance.primary_node not in data.nodes:
469
        result.append("instance '%s' has invalid primary node '%s'" %
470
                      (instance_name, instance.primary_node))
471
      for snode in instance.secondary_nodes:
472
        if snode not in data.nodes:
473
          result.append("instance '%s' has invalid secondary node '%s'" %
474
                        (instance_name, snode))
475
      for idx, nic in enumerate(instance.nics):
476
        if nic.mac in seen_macs:
477
          result.append("instance '%s' has NIC %d mac %s duplicate" %
478
                        (instance_name, idx, nic.mac))
479
        else:
480
          seen_macs.append(nic.mac)
481
        if nic.nicparams:
482
          filled = cluster.SimpleFillNIC(nic.nicparams)
483
          owner = "instance %s nic %d" % (instance.name, idx)
484
          _helper(owner, "nicparams",
485
                  filled, constants.NICS_PARAMETER_TYPES)
486
          _helper_nic(owner, filled)
487

    
488
      # parameter checks
489
      if instance.beparams:
490
        _helper("instance %s" % instance.name, "beparams",
491
                cluster.FillBE(instance), constants.BES_PARAMETER_TYPES)
492

    
493
      # gather the drbd ports for duplicate checks
494
      for dsk in instance.disks:
495
        if dsk.dev_type in constants.LDS_DRBD:
496
          tcp_port = dsk.logical_id[2]
497
          if tcp_port not in ports:
498
            ports[tcp_port] = []
499
          ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
500
      # gather network port reservation
501
      net_port = getattr(instance, "network_port", None)
502
      if net_port is not None:
503
        if net_port not in ports:
504
          ports[net_port] = []
505
        ports[net_port].append((instance.name, "network port"))
506

    
507
      # instance disk verify
508
      for idx, disk in enumerate(instance.disks):
509
        result.extend(["instance '%s' disk %d error: %s" %
510
                       (instance.name, idx, msg) for msg in disk.Verify()])
511
        result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
512

    
513
    # cluster-wide pool of free ports
514
    for free_port in cluster.tcpudp_port_pool:
515
      if free_port not in ports:
516
        ports[free_port] = []
517
      ports[free_port].append(("cluster", "port marked as free"))
518

    
519
    # compute tcp/udp duplicate ports
520
    keys = ports.keys()
521
    keys.sort()
522
    for pnum in keys:
523
      pdata = ports[pnum]
524
      if len(pdata) > 1:
525
        txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
526
        result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
527

    
528
    # highest used tcp port check
529
    if keys:
530
      if keys[-1] > cluster.highest_used_port:
531
        result.append("Highest used port mismatch, saved %s, computed %s" %
532
                      (cluster.highest_used_port, keys[-1]))
533

    
534
    if not data.nodes[cluster.master_node].master_candidate:
535
      result.append("Master node is not a master candidate")
536

    
537
    # master candidate checks
538
    mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
539
    if mc_now < mc_max:
540
      result.append("Not enough master candidates: actual %d, target %d" %
541
                    (mc_now, mc_max))
542

    
543
    # node checks
544
    for node_name, node in data.nodes.items():
545
      if node.name != node_name:
546
        result.append("Node '%s' is indexed by wrong name '%s'" %
547
                      (node.name, node_name))
548
      if [node.master_candidate, node.drained, node.offline].count(True) > 1:
549
        result.append("Node %s state is invalid: master_candidate=%s,"
550
                      " drain=%s, offline=%s" %
551
                      (node.name, node.master_candidate, node.drained,
552
                       node.offline))
553
      if node.group not in data.nodegroups:
554
        result.append("Node '%s' has invalid group '%s'" %
555
                      (node.name, node.group))
556
      else:
557
        _helper("node %s" % node.name, "ndparams",
558
                cluster.FillND(node, data.nodegroups[node.group]),
559
                constants.NDS_PARAMETER_TYPES)
560

    
561
    # nodegroups checks
562
    nodegroups_names = set()
563
    for nodegroup_uuid in data.nodegroups:
564
      nodegroup = data.nodegroups[nodegroup_uuid]
565
      if nodegroup.uuid != nodegroup_uuid:
566
        result.append("node group '%s' (uuid: '%s') indexed by wrong uuid '%s'"
567
                      % (nodegroup.name, nodegroup.uuid, nodegroup_uuid))
568
      if utils.UUID_RE.match(nodegroup.name.lower()):
569
        result.append("node group '%s' (uuid: '%s') has uuid-like name" %
570
                      (nodegroup.name, nodegroup.uuid))
571
      if nodegroup.name in nodegroups_names:
572
        result.append("duplicate node group name '%s'" % nodegroup.name)
573
      else:
574
        nodegroups_names.add(nodegroup.name)
575
      group_name = "group %s" % nodegroup.name
576
      _helper_ipolicy(group_name, cluster.SimpleFillIPolicy(nodegroup.ipolicy))
577
      _helper_ispecs(group_name, cluster.SimpleFillIPolicy(nodegroup.ipolicy))
578
      if nodegroup.ndparams:
579
        _helper(group_name, "ndparams",
580
                cluster.SimpleFillND(nodegroup.ndparams),
581
                constants.NDS_PARAMETER_TYPES)
582

    
583
    # drbd minors check
584
    _, duplicates = self._UnlockedComputeDRBDMap()
585
    for node, minor, instance_a, instance_b in duplicates:
586
      result.append("DRBD minor %d on node %s is assigned twice to instances"
587
                    " %s and %s" % (minor, node, instance_a, instance_b))
588

    
589
    # IP checks
590
    default_nicparams = cluster.nicparams[constants.PP_DEFAULT]
591
    ips = {}
592

    
593
    def _AddIpAddress(ip, name):
594
      ips.setdefault(ip, []).append(name)
595

    
596
    _AddIpAddress(cluster.master_ip, "cluster_ip")
597

    
598
    for node in data.nodes.values():
599
      _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
600
      if node.secondary_ip != node.primary_ip:
601
        _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
602

    
603
    for instance in data.instances.values():
604
      for idx, nic in enumerate(instance.nics):
605
        if nic.ip is None:
606
          continue
607

    
608
        nicparams = objects.FillDict(default_nicparams, nic.nicparams)
609
        nic_mode = nicparams[constants.NIC_MODE]
610
        nic_link = nicparams[constants.NIC_LINK]
611

    
612
        if nic_mode == constants.NIC_MODE_BRIDGED:
613
          link = "bridge:%s" % nic_link
614
        elif nic_mode == constants.NIC_MODE_ROUTED:
615
          link = "route:%s" % nic_link
616
        else:
617
          raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
618

    
619
        _AddIpAddress("%s/%s" % (link, nic.ip),
620
                      "instance:%s/nic:%d" % (instance.name, idx))
621

    
622
    for ip, owners in ips.items():
623
      if len(owners) > 1:
624
        result.append("IP address %s is used by multiple owners: %s" %
625
                      (ip, utils.CommaJoin(owners)))
626

    
627
    return result
628

    
629
  @locking.ssynchronized(_config_lock, shared=1)
630
  def VerifyConfig(self):
631
    """Verify function.
632

633
    This is just a wrapper over L{_UnlockedVerifyConfig}.
634

635
    @rtype: list
636
    @return: a list of error messages; a non-empty list signifies
637
        configuration errors
638

639
    """
640
    return self._UnlockedVerifyConfig()
641

    
642
  def _UnlockedSetDiskID(self, disk, node_name):
643
    """Convert the unique ID to the ID needed on the target nodes.
644

645
    This is used only for drbd, which needs ip/port configuration.
646

647
    The routine descends down and updates its children also, because
648
    this helps when the only the top device is passed to the remote
649
    node.
650

651
    This function is for internal use, when the config lock is already held.
652

653
    """
654
    if disk.children:
655
      for child in disk.children:
656
        self._UnlockedSetDiskID(child, node_name)
657

    
658
    if disk.logical_id is None and disk.physical_id is not None:
659
      return
660
    if disk.dev_type == constants.LD_DRBD8:
661
      pnode, snode, port, pminor, sminor, secret = disk.logical_id
662
      if node_name not in (pnode, snode):
663
        raise errors.ConfigurationError("DRBD device not knowing node %s" %
664
                                        node_name)
665
      pnode_info = self._UnlockedGetNodeInfo(pnode)
666
      snode_info = self._UnlockedGetNodeInfo(snode)
667
      if pnode_info is None or snode_info is None:
668
        raise errors.ConfigurationError("Can't find primary or secondary node"
669
                                        " for %s" % str(disk))
670
      p_data = (pnode_info.secondary_ip, port)
671
      s_data = (snode_info.secondary_ip, port)
672
      if pnode == node_name:
673
        disk.physical_id = p_data + s_data + (pminor, secret)
674
      else: # it must be secondary, we tested above
675
        disk.physical_id = s_data + p_data + (sminor, secret)
676
    else:
677
      disk.physical_id = disk.logical_id
678
    return
679

    
680
  @locking.ssynchronized(_config_lock)
681
  def SetDiskID(self, disk, node_name):
682
    """Convert the unique ID to the ID needed on the target nodes.
683

684
    This is used only for drbd, which needs ip/port configuration.
685

686
    The routine descends down and updates its children also, because
687
    this helps when the only the top device is passed to the remote
688
    node.
689

690
    """
691
    return self._UnlockedSetDiskID(disk, node_name)
692

    
693
  @locking.ssynchronized(_config_lock)
694
  def AddTcpUdpPort(self, port):
695
    """Adds a new port to the available port pool.
696

697
    """
698
    if not isinstance(port, int):
699
      raise errors.ProgrammerError("Invalid type passed for port")
700

    
701
    self._config_data.cluster.tcpudp_port_pool.add(port)
702
    self._WriteConfig()
703

    
704
  @locking.ssynchronized(_config_lock, shared=1)
705
  def GetPortList(self):
706
    """Returns a copy of the current port list.
707

708
    """
709
    return self._config_data.cluster.tcpudp_port_pool.copy()
710

    
711
  @locking.ssynchronized(_config_lock)
712
  def AllocatePort(self):
713
    """Allocate a port.
714

715
    The port will be taken from the available port pool or from the
716
    default port range (and in this case we increase
717
    highest_used_port).
718

719
    """
720
    # If there are TCP/IP ports configured, we use them first.
721
    if self._config_data.cluster.tcpudp_port_pool:
722
      port = self._config_data.cluster.tcpudp_port_pool.pop()
723
    else:
724
      port = self._config_data.cluster.highest_used_port + 1
725
      if port >= constants.LAST_DRBD_PORT:
726
        raise errors.ConfigurationError("The highest used port is greater"
727
                                        " than %s. Aborting." %
728
                                        constants.LAST_DRBD_PORT)
729
      self._config_data.cluster.highest_used_port = port
730

    
731
    self._WriteConfig()
732
    return port
733

    
734
  def _UnlockedComputeDRBDMap(self):
735
    """Compute the used DRBD minor/nodes.
736

737
    @rtype: (dict, list)
738
    @return: dictionary of node_name: dict of minor: instance_name;
739
        the returned dict will have all the nodes in it (even if with
740
        an empty list), and a list of duplicates; if the duplicates
741
        list is not empty, the configuration is corrupted and its caller
742
        should raise an exception
743

744
    """
745
    def _AppendUsedPorts(instance_name, disk, used):
746
      duplicates = []
747
      if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
748
        node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
749
        for node, port in ((node_a, minor_a), (node_b, minor_b)):
750
          assert node in used, ("Node '%s' of instance '%s' not found"
751
                                " in node list" % (node, instance_name))
752
          if port in used[node]:
753
            duplicates.append((node, port, instance_name, used[node][port]))
754
          else:
755
            used[node][port] = instance_name
756
      if disk.children:
757
        for child in disk.children:
758
          duplicates.extend(_AppendUsedPorts(instance_name, child, used))
759
      return duplicates
760

    
761
    duplicates = []
762
    my_dict = dict((node, {}) for node in self._config_data.nodes)
763
    for instance in self._config_data.instances.itervalues():
764
      for disk in instance.disks:
765
        duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
766
    for (node, minor), instance in self._temporary_drbds.iteritems():
767
      if minor in my_dict[node] and my_dict[node][minor] != instance:
768
        duplicates.append((node, minor, instance, my_dict[node][minor]))
769
      else:
770
        my_dict[node][minor] = instance
771
    return my_dict, duplicates
772

    
773
  @locking.ssynchronized(_config_lock)
774
  def ComputeDRBDMap(self):
775
    """Compute the used DRBD minor/nodes.
776

777
    This is just a wrapper over L{_UnlockedComputeDRBDMap}.
778

779
    @return: dictionary of node_name: dict of minor: instance_name;
780
        the returned dict will have all the nodes in it (even if with
781
        an empty list).
782

783
    """
784
    d_map, duplicates = self._UnlockedComputeDRBDMap()
785
    if duplicates:
786
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
787
                                      str(duplicates))
788
    return d_map
789

    
790
  @locking.ssynchronized(_config_lock)
791
  def AllocateDRBDMinor(self, nodes, instance):
792
    """Allocate a drbd minor.
793

794
    The free minor will be automatically computed from the existing
795
    devices. A node can be given multiple times in order to allocate
796
    multiple minors. The result is the list of minors, in the same
797
    order as the passed nodes.
798

799
    @type instance: string
800
    @param instance: the instance for which we allocate minors
801

802
    """
803
    assert isinstance(instance, basestring), \
804
           "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
805

    
806
    d_map, duplicates = self._UnlockedComputeDRBDMap()
807
    if duplicates:
808
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
809
                                      str(duplicates))
810
    result = []
811
    for nname in nodes:
812
      ndata = d_map[nname]
813
      if not ndata:
814
        # no minors used, we can start at 0
815
        result.append(0)
816
        ndata[0] = instance
817
        self._temporary_drbds[(nname, 0)] = instance
818
        continue
819
      keys = ndata.keys()
820
      keys.sort()
821
      ffree = utils.FirstFree(keys)
822
      if ffree is None:
823
        # return the next minor
824
        # TODO: implement high-limit check
825
        minor = keys[-1] + 1
826
      else:
827
        minor = ffree
828
      # double-check minor against current instances
829
      assert minor not in d_map[nname], \
830
             ("Attempt to reuse allocated DRBD minor %d on node %s,"
831
              " already allocated to instance %s" %
832
              (minor, nname, d_map[nname][minor]))
833
      ndata[minor] = instance
834
      # double-check minor against reservation
835
      r_key = (nname, minor)
836
      assert r_key not in self._temporary_drbds, \
837
             ("Attempt to reuse reserved DRBD minor %d on node %s,"
838
              " reserved for instance %s" %
839
              (minor, nname, self._temporary_drbds[r_key]))
840
      self._temporary_drbds[r_key] = instance
841
      result.append(minor)
842
    logging.debug("Request to allocate drbd minors, input: %s, returning %s",
843
                  nodes, result)
844
    return result
845

    
846
  def _UnlockedReleaseDRBDMinors(self, instance):
847
    """Release temporary drbd minors allocated for a given instance.
848

849
    @type instance: string
850
    @param instance: the instance for which temporary minors should be
851
                     released
852

853
    """
854
    assert isinstance(instance, basestring), \
855
           "Invalid argument passed to ReleaseDRBDMinors"
856
    for key, name in self._temporary_drbds.items():
857
      if name == instance:
858
        del self._temporary_drbds[key]
859

    
860
  @locking.ssynchronized(_config_lock)
861
  def ReleaseDRBDMinors(self, instance):
862
    """Release temporary drbd minors allocated for a given instance.
863

864
    This should be called on the error paths, on the success paths
865
    it's automatically called by the ConfigWriter add and update
866
    functions.
867

868
    This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
869

870
    @type instance: string
871
    @param instance: the instance for which temporary minors should be
872
                     released
873

874
    """
875
    self._UnlockedReleaseDRBDMinors(instance)
876

    
877
  @locking.ssynchronized(_config_lock, shared=1)
878
  def GetConfigVersion(self):
879
    """Get the configuration version.
880

881
    @return: Config version
882

883
    """
884
    return self._config_data.version
885

    
886
  @locking.ssynchronized(_config_lock, shared=1)
887
  def GetClusterName(self):
888
    """Get cluster name.
889

890
    @return: Cluster name
891

892
    """
893
    return self._config_data.cluster.cluster_name
894

    
895
  @locking.ssynchronized(_config_lock, shared=1)
896
  def GetMasterNode(self):
897
    """Get the hostname of the master node for this cluster.
898

899
    @return: Master hostname
900

901
    """
902
    return self._config_data.cluster.master_node
903

    
904
  @locking.ssynchronized(_config_lock, shared=1)
905
  def GetMasterIP(self):
906
    """Get the IP of the master node for this cluster.
907

908
    @return: Master IP
909

910
    """
911
    return self._config_data.cluster.master_ip
912

    
913
  @locking.ssynchronized(_config_lock, shared=1)
914
  def GetMasterNetdev(self):
915
    """Get the master network device for this cluster.
916

917
    """
918
    return self._config_data.cluster.master_netdev
919

    
920
  @locking.ssynchronized(_config_lock, shared=1)
921
  def GetMasterNetmask(self):
922
    """Get the netmask of the master node for this cluster.
923

924
    """
925
    return self._config_data.cluster.master_netmask
926

    
927
  @locking.ssynchronized(_config_lock, shared=1)
928
  def GetUseExternalMipScript(self):
929
    """Get flag representing whether to use the external master IP setup script.
930

931
    """
932
    return self._config_data.cluster.use_external_mip_script
933

    
934
  @locking.ssynchronized(_config_lock, shared=1)
935
  def GetFileStorageDir(self):
936
    """Get the file storage dir for this cluster.
937

938
    """
939
    return self._config_data.cluster.file_storage_dir
940

    
941
  @locking.ssynchronized(_config_lock, shared=1)
942
  def GetSharedFileStorageDir(self):
943
    """Get the shared file storage dir for this cluster.
944

945
    """
946
    return self._config_data.cluster.shared_file_storage_dir
947

    
948
  @locking.ssynchronized(_config_lock, shared=1)
949
  def GetHypervisorType(self):
950
    """Get the hypervisor type for this cluster.
951

952
    """
953
    return self._config_data.cluster.enabled_hypervisors[0]
954

    
955
  @locking.ssynchronized(_config_lock, shared=1)
956
  def GetHostKey(self):
957
    """Return the rsa hostkey from the config.
958

959
    @rtype: string
960
    @return: the rsa hostkey
961

962
    """
963
    return self._config_data.cluster.rsahostkeypub
964

    
965
  @locking.ssynchronized(_config_lock, shared=1)
966
  def GetDefaultIAllocator(self):
967
    """Get the default instance allocator for this cluster.
968

969
    """
970
    return self._config_data.cluster.default_iallocator
971

    
972
  @locking.ssynchronized(_config_lock, shared=1)
973
  def GetPrimaryIPFamily(self):
974
    """Get cluster primary ip family.
975

976
    @return: primary ip family
977

978
    """
979
    return self._config_data.cluster.primary_ip_family
980

    
981
  @locking.ssynchronized(_config_lock, shared=1)
982
  def GetMasterNetworkParameters(self):
983
    """Get network parameters of the master node.
984

985
    @rtype: L{object.MasterNetworkParameters}
986
    @return: network parameters of the master node
987

988
    """
989
    cluster = self._config_data.cluster
990
    result = objects.MasterNetworkParameters(name=cluster.master_node,
991
      ip=cluster.master_ip,
992
      netmask=cluster.master_netmask,
993
      netdev=cluster.master_netdev,
994
      ip_family=cluster.primary_ip_family)
995

    
996
    return result
997

    
998
  @locking.ssynchronized(_config_lock)
999
  def AddNodeGroup(self, group, ec_id, check_uuid=True):
1000
    """Add a node group to the configuration.
1001

1002
    This method calls group.UpgradeConfig() to fill any missing attributes
1003
    according to their default values.
1004

1005
    @type group: L{objects.NodeGroup}
1006
    @param group: the NodeGroup object to add
1007
    @type ec_id: string
1008
    @param ec_id: unique id for the job to use when creating a missing UUID
1009
    @type check_uuid: bool
1010
    @param check_uuid: add an UUID to the group if it doesn't have one or, if
1011
                       it does, ensure that it does not exist in the
1012
                       configuration already
1013

1014
    """
1015
    self._UnlockedAddNodeGroup(group, ec_id, check_uuid)
1016
    self._WriteConfig()
1017

    
1018
  def _UnlockedAddNodeGroup(self, group, ec_id, check_uuid):
1019
    """Add a node group to the configuration.
1020

1021
    """
1022
    logging.info("Adding node group %s to configuration", group.name)
1023

    
1024
    # Some code might need to add a node group with a pre-populated UUID
1025
    # generated with ConfigWriter.GenerateUniqueID(). We allow them to bypass
1026
    # the "does this UUID" exist already check.
1027
    if check_uuid:
1028
      self._EnsureUUID(group, ec_id)
1029

    
1030
    try:
1031
      existing_uuid = self._UnlockedLookupNodeGroup(group.name)
1032
    except errors.OpPrereqError:
1033
      pass
1034
    else:
1035
      raise errors.OpPrereqError("Desired group name '%s' already exists as a"
1036
                                 " node group (UUID: %s)" %
1037
                                 (group.name, existing_uuid),
1038
                                 errors.ECODE_EXISTS)
1039

    
1040
    group.serial_no = 1
1041
    group.ctime = group.mtime = time.time()
1042
    group.UpgradeConfig()
1043

    
1044
    self._config_data.nodegroups[group.uuid] = group
1045
    self._config_data.cluster.serial_no += 1
1046

    
1047
  @locking.ssynchronized(_config_lock)
1048
  def RemoveNodeGroup(self, group_uuid):
1049
    """Remove a node group from the configuration.
1050

1051
    @type group_uuid: string
1052
    @param group_uuid: the UUID of the node group to remove
1053

1054
    """
1055
    logging.info("Removing node group %s from configuration", group_uuid)
1056

    
1057
    if group_uuid not in self._config_data.nodegroups:
1058
      raise errors.ConfigurationError("Unknown node group '%s'" % group_uuid)
1059

    
1060
    assert len(self._config_data.nodegroups) != 1, \
1061
            "Group '%s' is the only group, cannot be removed" % group_uuid
1062

    
1063
    del self._config_data.nodegroups[group_uuid]
1064
    self._config_data.cluster.serial_no += 1
1065
    self._WriteConfig()
1066

    
1067
  def _UnlockedLookupNodeGroup(self, target):
1068
    """Lookup a node group's UUID.
1069

1070
    @type target: string or None
1071
    @param target: group name or UUID or None to look for the default
1072
    @rtype: string
1073
    @return: nodegroup UUID
1074
    @raises errors.OpPrereqError: when the target group cannot be found
1075

1076
    """
1077
    if target is None:
1078
      if len(self._config_data.nodegroups) != 1:
1079
        raise errors.OpPrereqError("More than one node group exists. Target"
1080
                                   " group must be specified explicitely.")
1081
      else:
1082
        return self._config_data.nodegroups.keys()[0]
1083
    if target in self._config_data.nodegroups:
1084
      return target
1085
    for nodegroup in self._config_data.nodegroups.values():
1086
      if nodegroup.name == target:
1087
        return nodegroup.uuid
1088
    raise errors.OpPrereqError("Node group '%s' not found" % target,
1089
                               errors.ECODE_NOENT)
1090

    
1091
  @locking.ssynchronized(_config_lock, shared=1)
1092
  def LookupNodeGroup(self, target):
1093
    """Lookup a node group's UUID.
1094

1095
    This function is just a wrapper over L{_UnlockedLookupNodeGroup}.
1096

1097
    @type target: string or None
1098
    @param target: group name or UUID or None to look for the default
1099
    @rtype: string
1100
    @return: nodegroup UUID
1101

1102
    """
1103
    return self._UnlockedLookupNodeGroup(target)
1104

    
1105
  def _UnlockedGetNodeGroup(self, uuid):
1106
    """Lookup a node group.
1107

1108
    @type uuid: string
1109
    @param uuid: group UUID
1110
    @rtype: L{objects.NodeGroup} or None
1111
    @return: nodegroup object, or None if not found
1112

1113
    """
1114
    if uuid not in self._config_data.nodegroups:
1115
      return None
1116

    
1117
    return self._config_data.nodegroups[uuid]
1118

    
1119
  @locking.ssynchronized(_config_lock, shared=1)
1120
  def GetNodeGroup(self, uuid):
1121
    """Lookup a node group.
1122

1123
    @type uuid: string
1124
    @param uuid: group UUID
1125
    @rtype: L{objects.NodeGroup} or None
1126
    @return: nodegroup object, or None if not found
1127

1128
    """
1129
    return self._UnlockedGetNodeGroup(uuid)
1130

    
1131
  @locking.ssynchronized(_config_lock, shared=1)
1132
  def GetAllNodeGroupsInfo(self):
1133
    """Get the configuration of all node groups.
1134

1135
    """
1136
    return dict(self._config_data.nodegroups)
1137

    
1138
  @locking.ssynchronized(_config_lock, shared=1)
1139
  def GetNodeGroupList(self):
1140
    """Get a list of node groups.
1141

1142
    """
1143
    return self._config_data.nodegroups.keys()
1144

    
1145
  @locking.ssynchronized(_config_lock, shared=1)
1146
  def GetNodeGroupMembersByNodes(self, nodes):
1147
    """Get nodes which are member in the same nodegroups as the given nodes.
1148

1149
    """
1150
    ngfn = lambda node_name: self._UnlockedGetNodeInfo(node_name).group
1151
    return frozenset(member_name
1152
                     for node_name in nodes
1153
                     for member_name in
1154
                       self._UnlockedGetNodeGroup(ngfn(node_name)).members)
1155

    
1156
  @locking.ssynchronized(_config_lock)
1157
  def AddInstance(self, instance, ec_id):
1158
    """Add an instance to the config.
1159

1160
    This should be used after creating a new instance.
1161

1162
    @type instance: L{objects.Instance}
1163
    @param instance: the instance object
1164

1165
    """
1166
    if not isinstance(instance, objects.Instance):
1167
      raise errors.ProgrammerError("Invalid type passed to AddInstance")
1168

    
1169
    if instance.disk_template != constants.DT_DISKLESS:
1170
      all_lvs = instance.MapLVsByNode()
1171
      logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
1172

    
1173
    all_macs = self._AllMACs()
1174
    for nic in instance.nics:
1175
      if nic.mac in all_macs:
1176
        raise errors.ConfigurationError("Cannot add instance %s:"
1177
                                        " MAC address '%s' already in use." %
1178
                                        (instance.name, nic.mac))
1179

    
1180
    self._EnsureUUID(instance, ec_id)
1181

    
1182
    instance.serial_no = 1
1183
    instance.ctime = instance.mtime = time.time()
1184
    self._config_data.instances[instance.name] = instance
1185
    self._config_data.cluster.serial_no += 1
1186
    self._UnlockedReleaseDRBDMinors(instance.name)
1187
    self._WriteConfig()
1188

    
1189
  def _EnsureUUID(self, item, ec_id):
1190
    """Ensures a given object has a valid UUID.
1191

1192
    @param item: the instance or node to be checked
1193
    @param ec_id: the execution context id for the uuid reservation
1194

1195
    """
1196
    if not item.uuid:
1197
      item.uuid = self._GenerateUniqueID(ec_id)
1198
    elif item.uuid in self._AllIDs(include_temporary=True):
1199
      raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
1200
                                      " in use" % (item.name, item.uuid))
1201

    
1202
  def _SetInstanceStatus(self, instance_name, status):
1203
    """Set the instance's status to a given value.
1204

1205
    """
1206
    assert status in constants.ADMINST_ALL, \
1207
           "Invalid status '%s' passed to SetInstanceStatus" % (status,)
1208

    
1209
    if instance_name not in self._config_data.instances:
1210
      raise errors.ConfigurationError("Unknown instance '%s'" %
1211
                                      instance_name)
1212
    instance = self._config_data.instances[instance_name]
1213
    if instance.admin_state != status:
1214
      instance.admin_state = status
1215
      instance.serial_no += 1
1216
      instance.mtime = time.time()
1217
      self._WriteConfig()
1218

    
1219
  @locking.ssynchronized(_config_lock)
1220
  def MarkInstanceUp(self, instance_name):
1221
    """Mark the instance status to up in the config.
1222

1223
    """
1224
    self._SetInstanceStatus(instance_name, constants.ADMINST_UP)
1225

    
1226
  @locking.ssynchronized(_config_lock)
1227
  def MarkInstanceOffline(self, instance_name):
1228
    """Mark the instance status to down in the config.
1229

1230
    """
1231
    self._SetInstanceStatus(instance_name, constants.ADMINST_OFFLINE)
1232

    
1233
  @locking.ssynchronized(_config_lock)
1234
  def RemoveInstance(self, instance_name):
1235
    """Remove the instance from the configuration.
1236

1237
    """
1238
    if instance_name not in self._config_data.instances:
1239
      raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
1240

    
1241
    # If a network port has been allocated to the instance,
1242
    # return it to the pool of free ports.
1243
    inst = self._config_data.instances[instance_name]
1244
    network_port = getattr(inst, "network_port", None)
1245
    if network_port is not None:
1246
      self._config_data.cluster.tcpudp_port_pool.add(network_port)
1247

    
1248
    del self._config_data.instances[instance_name]
1249
    self._config_data.cluster.serial_no += 1
1250
    self._WriteConfig()
1251

    
1252
  @locking.ssynchronized(_config_lock)
1253
  def RenameInstance(self, old_name, new_name):
1254
    """Rename an instance.
1255

1256
    This needs to be done in ConfigWriter and not by RemoveInstance
1257
    combined with AddInstance as only we can guarantee an atomic
1258
    rename.
1259

1260
    """
1261
    if old_name not in self._config_data.instances:
1262
      raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
1263

    
1264
    # Operate on a copy to not loose instance object in case of a failure
1265
    inst = self._config_data.instances[old_name].Copy()
1266
    inst.name = new_name
1267

    
1268
    for (idx, disk) in enumerate(inst.disks):
1269
      if disk.dev_type == constants.LD_FILE:
1270
        # rename the file paths in logical and physical id
1271
        file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
1272
        disk.logical_id = (disk.logical_id[0],
1273
                           utils.PathJoin(file_storage_dir, inst.name,
1274
                                          "disk%s" % idx))
1275
        disk.physical_id = disk.logical_id
1276

    
1277
    # Actually replace instance object
1278
    del self._config_data.instances[old_name]
1279
    self._config_data.instances[inst.name] = inst
1280

    
1281
    # Force update of ssconf files
1282
    self._config_data.cluster.serial_no += 1
1283

    
1284
    self._WriteConfig()
1285

    
1286
  @locking.ssynchronized(_config_lock)
1287
  def MarkInstanceDown(self, instance_name):
1288
    """Mark the status of an instance to down in the configuration.
1289

1290
    """
1291
    self._SetInstanceStatus(instance_name, constants.ADMINST_DOWN)
1292

    
1293
  def _UnlockedGetInstanceList(self):
1294
    """Get the list of instances.
1295

1296
    This function is for internal use, when the config lock is already held.
1297

1298
    """
1299
    return self._config_data.instances.keys()
1300

    
1301
  @locking.ssynchronized(_config_lock, shared=1)
1302
  def GetInstanceList(self):
1303
    """Get the list of instances.
1304

1305
    @return: array of instances, ex. ['instance2.example.com',
1306
        'instance1.example.com']
1307

1308
    """
1309
    return self._UnlockedGetInstanceList()
1310

    
1311
  def ExpandInstanceName(self, short_name):
1312
    """Attempt to expand an incomplete instance name.
1313

1314
    """
1315
    # Locking is done in L{ConfigWriter.GetInstanceList}
1316
    return _MatchNameComponentIgnoreCase(short_name, self.GetInstanceList())
1317

    
1318
  def _UnlockedGetInstanceInfo(self, instance_name):
1319
    """Returns information about an instance.
1320

1321
    This function is for internal use, when the config lock is already held.
1322

1323
    """
1324
    if instance_name not in self._config_data.instances:
1325
      return None
1326

    
1327
    return self._config_data.instances[instance_name]
1328

    
1329
  @locking.ssynchronized(_config_lock, shared=1)
1330
  def GetInstanceInfo(self, instance_name):
1331
    """Returns information about an instance.
1332

1333
    It takes the information from the configuration file. Other information of
1334
    an instance are taken from the live systems.
1335

1336
    @param instance_name: name of the instance, e.g.
1337
        I{instance1.example.com}
1338

1339
    @rtype: L{objects.Instance}
1340
    @return: the instance object
1341

1342
    """
1343
    return self._UnlockedGetInstanceInfo(instance_name)
1344

    
1345
  @locking.ssynchronized(_config_lock, shared=1)
1346
  def GetInstanceNodeGroups(self, instance_name, primary_only=False):
1347
    """Returns set of node group UUIDs for instance's nodes.
1348

1349
    @rtype: frozenset
1350

1351
    """
1352
    instance = self._UnlockedGetInstanceInfo(instance_name)
1353
    if not instance:
1354
      raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
1355

    
1356
    if primary_only:
1357
      nodes = [instance.primary_node]
1358
    else:
1359
      nodes = instance.all_nodes
1360

    
1361
    return frozenset(self._UnlockedGetNodeInfo(node_name).group
1362
                     for node_name in nodes)
1363

    
1364
  @locking.ssynchronized(_config_lock, shared=1)
1365
  def GetMultiInstanceInfo(self, instances):
1366
    """Get the configuration of multiple instances.
1367

1368
    @param instances: list of instance names
1369
    @rtype: list
1370
    @return: list of tuples (instance, instance_info), where
1371
        instance_info is what would GetInstanceInfo return for the
1372
        node, while keeping the original order
1373

1374
    """
1375
    return [(name, self._UnlockedGetInstanceInfo(name)) for name in instances]
1376

    
1377
  @locking.ssynchronized(_config_lock, shared=1)
1378
  def GetAllInstancesInfo(self):
1379
    """Get the configuration of all instances.
1380

1381
    @rtype: dict
1382
    @return: dict of (instance, instance_info), where instance_info is what
1383
              would GetInstanceInfo return for the node
1384

1385
    """
1386
    my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
1387
                    for instance in self._UnlockedGetInstanceList()])
1388
    return my_dict
1389

    
1390
  @locking.ssynchronized(_config_lock, shared=1)
1391
  def GetInstancesInfoByFilter(self, filter_fn):
1392
    """Get instance configuration with a filter.
1393

1394
    @type filter_fn: callable
1395
    @param filter_fn: Filter function receiving instance object as parameter,
1396
      returning boolean. Important: this function is called while the
1397
      configuration locks is held. It must not do any complex work or call
1398
      functions potentially leading to a deadlock. Ideally it doesn't call any
1399
      other functions and just compares instance attributes.
1400

1401
    """
1402
    return dict((name, inst)
1403
                for (name, inst) in self._config_data.instances.items()
1404
                if filter_fn(inst))
1405

    
1406
  @locking.ssynchronized(_config_lock)
1407
  def AddNode(self, node, ec_id):
1408
    """Add a node to the configuration.
1409

1410
    @type node: L{objects.Node}
1411
    @param node: a Node instance
1412

1413
    """
1414
    logging.info("Adding node %s to configuration", node.name)
1415

    
1416
    self._EnsureUUID(node, ec_id)
1417

    
1418
    node.serial_no = 1
1419
    node.ctime = node.mtime = time.time()
1420
    self._UnlockedAddNodeToGroup(node.name, node.group)
1421
    self._config_data.nodes[node.name] = node
1422
    self._config_data.cluster.serial_no += 1
1423
    self._WriteConfig()
1424

    
1425
  @locking.ssynchronized(_config_lock)
1426
  def RemoveNode(self, node_name):
1427
    """Remove a node from the configuration.
1428

1429
    """
1430
    logging.info("Removing node %s from configuration", node_name)
1431

    
1432
    if node_name not in self._config_data.nodes:
1433
      raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1434

    
1435
    self._UnlockedRemoveNodeFromGroup(self._config_data.nodes[node_name])
1436
    del self._config_data.nodes[node_name]
1437
    self._config_data.cluster.serial_no += 1
1438
    self._WriteConfig()
1439

    
1440
  def ExpandNodeName(self, short_name):
1441
    """Attempt to expand an incomplete node name.
1442

1443
    """
1444
    # Locking is done in L{ConfigWriter.GetNodeList}
1445
    return _MatchNameComponentIgnoreCase(short_name, self.GetNodeList())
1446

    
1447
  def _UnlockedGetNodeInfo(self, node_name):
1448
    """Get the configuration of a node, as stored in the config.
1449

1450
    This function is for internal use, when the config lock is already
1451
    held.
1452

1453
    @param node_name: the node name, e.g. I{node1.example.com}
1454

1455
    @rtype: L{objects.Node}
1456
    @return: the node object
1457

1458
    """
1459
    if node_name not in self._config_data.nodes:
1460
      return None
1461

    
1462
    return self._config_data.nodes[node_name]
1463

    
1464
  @locking.ssynchronized(_config_lock, shared=1)
1465
  def GetNodeInfo(self, node_name):
1466
    """Get the configuration of a node, as stored in the config.
1467

1468
    This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1469

1470
    @param node_name: the node name, e.g. I{node1.example.com}
1471

1472
    @rtype: L{objects.Node}
1473
    @return: the node object
1474

1475
    """
1476
    return self._UnlockedGetNodeInfo(node_name)
1477

    
1478
  @locking.ssynchronized(_config_lock, shared=1)
1479
  def GetNodeInstances(self, node_name):
1480
    """Get the instances of a node, as stored in the config.
1481

1482
    @param node_name: the node name, e.g. I{node1.example.com}
1483

1484
    @rtype: (list, list)
1485
    @return: a tuple with two lists: the primary and the secondary instances
1486

1487
    """
1488
    pri = []
1489
    sec = []
1490
    for inst in self._config_data.instances.values():
1491
      if inst.primary_node == node_name:
1492
        pri.append(inst.name)
1493
      if node_name in inst.secondary_nodes:
1494
        sec.append(inst.name)
1495
    return (pri, sec)
1496

    
1497
  @locking.ssynchronized(_config_lock, shared=1)
1498
  def GetNodeGroupInstances(self, uuid, primary_only=False):
1499
    """Get the instances of a node group.
1500

1501
    @param uuid: Node group UUID
1502
    @param primary_only: Whether to only consider primary nodes
1503
    @rtype: frozenset
1504
    @return: List of instance names in node group
1505

1506
    """
1507
    if primary_only:
1508
      nodes_fn = lambda inst: [inst.primary_node]
1509
    else:
1510
      nodes_fn = lambda inst: inst.all_nodes
1511

    
1512
    return frozenset(inst.name
1513
                     for inst in self._config_data.instances.values()
1514
                     for node_name in nodes_fn(inst)
1515
                     if self._UnlockedGetNodeInfo(node_name).group == uuid)
1516

    
1517
  def _UnlockedGetNodeList(self):
1518
    """Return the list of nodes which are in the configuration.
1519

1520
    This function is for internal use, when the config lock is already
1521
    held.
1522

1523
    @rtype: list
1524

1525
    """
1526
    return self._config_data.nodes.keys()
1527

    
1528
  @locking.ssynchronized(_config_lock, shared=1)
1529
  def GetNodeList(self):
1530
    """Return the list of nodes which are in the configuration.
1531

1532
    """
1533
    return self._UnlockedGetNodeList()
1534

    
1535
  def _UnlockedGetOnlineNodeList(self):
1536
    """Return the list of nodes which are online.
1537

1538
    """
1539
    all_nodes = [self._UnlockedGetNodeInfo(node)
1540
                 for node in self._UnlockedGetNodeList()]
1541
    return [node.name for node in all_nodes if not node.offline]
1542

    
1543
  @locking.ssynchronized(_config_lock, shared=1)
1544
  def GetOnlineNodeList(self):
1545
    """Return the list of nodes which are online.
1546

1547
    """
1548
    return self._UnlockedGetOnlineNodeList()
1549

    
1550
  @locking.ssynchronized(_config_lock, shared=1)
1551
  def GetVmCapableNodeList(self):
1552
    """Return the list of nodes which are not vm capable.
1553

1554
    """
1555
    all_nodes = [self._UnlockedGetNodeInfo(node)
1556
                 for node in self._UnlockedGetNodeList()]
1557
    return [node.name for node in all_nodes if node.vm_capable]
1558

    
1559
  @locking.ssynchronized(_config_lock, shared=1)
1560
  def GetNonVmCapableNodeList(self):
1561
    """Return the list of nodes which are not vm capable.
1562

1563
    """
1564
    all_nodes = [self._UnlockedGetNodeInfo(node)
1565
                 for node in self._UnlockedGetNodeList()]
1566
    return [node.name for node in all_nodes if not node.vm_capable]
1567

    
1568
  @locking.ssynchronized(_config_lock, shared=1)
1569
  def GetMultiNodeInfo(self, nodes):
1570
    """Get the configuration of multiple nodes.
1571

1572
    @param nodes: list of node names
1573
    @rtype: list
1574
    @return: list of tuples of (node, node_info), where node_info is
1575
        what would GetNodeInfo return for the node, in the original
1576
        order
1577

1578
    """
1579
    return [(name, self._UnlockedGetNodeInfo(name)) for name in nodes]
1580

    
1581
  @locking.ssynchronized(_config_lock, shared=1)
1582
  def GetAllNodesInfo(self):
1583
    """Get the configuration of all nodes.
1584

1585
    @rtype: dict
1586
    @return: dict of (node, node_info), where node_info is what
1587
              would GetNodeInfo return for the node
1588

1589
    """
1590
    return self._UnlockedGetAllNodesInfo()
1591

    
1592
  def _UnlockedGetAllNodesInfo(self):
1593
    """Gets configuration of all nodes.
1594

1595
    @note: See L{GetAllNodesInfo}
1596

1597
    """
1598
    return dict([(node, self._UnlockedGetNodeInfo(node))
1599
                 for node in self._UnlockedGetNodeList()])
1600

    
1601
  @locking.ssynchronized(_config_lock, shared=1)
1602
  def GetNodeGroupsFromNodes(self, nodes):
1603
    """Returns groups for a list of nodes.
1604

1605
    @type nodes: list of string
1606
    @param nodes: List of node names
1607
    @rtype: frozenset
1608

1609
    """
1610
    return frozenset(self._UnlockedGetNodeInfo(name).group for name in nodes)
1611

    
1612
  def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1613
    """Get the number of current and maximum desired and possible candidates.
1614

1615
    @type exceptions: list
1616
    @param exceptions: if passed, list of nodes that should be ignored
1617
    @rtype: tuple
1618
    @return: tuple of (current, desired and possible, possible)
1619

1620
    """
1621
    mc_now = mc_should = mc_max = 0
1622
    for node in self._config_data.nodes.values():
1623
      if exceptions and node.name in exceptions:
1624
        continue
1625
      if not (node.offline or node.drained) and node.master_capable:
1626
        mc_max += 1
1627
      if node.master_candidate:
1628
        mc_now += 1
1629
    mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1630
    return (mc_now, mc_should, mc_max)
1631

    
1632
  @locking.ssynchronized(_config_lock, shared=1)
1633
  def GetMasterCandidateStats(self, exceptions=None):
1634
    """Get the number of current and maximum possible candidates.
1635

1636
    This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1637

1638
    @type exceptions: list
1639
    @param exceptions: if passed, list of nodes that should be ignored
1640
    @rtype: tuple
1641
    @return: tuple of (current, max)
1642

1643
    """
1644
    return self._UnlockedGetMasterCandidateStats(exceptions)
1645

    
1646
  @locking.ssynchronized(_config_lock)
1647
  def MaintainCandidatePool(self, exceptions):
1648
    """Try to grow the candidate pool to the desired size.
1649

1650
    @type exceptions: list
1651
    @param exceptions: if passed, list of nodes that should be ignored
1652
    @rtype: list
1653
    @return: list with the adjusted nodes (L{objects.Node} instances)
1654

1655
    """
1656
    mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1657
    mod_list = []
1658
    if mc_now < mc_max:
1659
      node_list = self._config_data.nodes.keys()
1660
      random.shuffle(node_list)
1661
      for name in node_list:
1662
        if mc_now >= mc_max:
1663
          break
1664
        node = self._config_data.nodes[name]
1665
        if (node.master_candidate or node.offline or node.drained or
1666
            node.name in exceptions or not node.master_capable):
1667
          continue
1668
        mod_list.append(node)
1669
        node.master_candidate = True
1670
        node.serial_no += 1
1671
        mc_now += 1
1672
      if mc_now != mc_max:
1673
        # this should not happen
1674
        logging.warning("Warning: MaintainCandidatePool didn't manage to"
1675
                        " fill the candidate pool (%d/%d)", mc_now, mc_max)
1676
      if mod_list:
1677
        self._config_data.cluster.serial_no += 1
1678
        self._WriteConfig()
1679

    
1680
    return mod_list
1681

    
1682
  def _UnlockedAddNodeToGroup(self, node_name, nodegroup_uuid):
1683
    """Add a given node to the specified group.
1684

1685
    """
1686
    if nodegroup_uuid not in self._config_data.nodegroups:
1687
      # This can happen if a node group gets deleted between its lookup and
1688
      # when we're adding the first node to it, since we don't keep a lock in
1689
      # the meantime. It's ok though, as we'll fail cleanly if the node group
1690
      # is not found anymore.
1691
      raise errors.OpExecError("Unknown node group: %s" % nodegroup_uuid)
1692
    if node_name not in self._config_data.nodegroups[nodegroup_uuid].members:
1693
      self._config_data.nodegroups[nodegroup_uuid].members.append(node_name)
1694

    
1695
  def _UnlockedRemoveNodeFromGroup(self, node):
1696
    """Remove a given node from its group.
1697

1698
    """
1699
    nodegroup = node.group
1700
    if nodegroup not in self._config_data.nodegroups:
1701
      logging.warning("Warning: node '%s' has unknown node group '%s'"
1702
                      " (while being removed from it)", node.name, nodegroup)
1703
    nodegroup_obj = self._config_data.nodegroups[nodegroup]
1704
    if node.name not in nodegroup_obj.members:
1705
      logging.warning("Warning: node '%s' not a member of its node group '%s'"
1706
                      " (while being removed from it)", node.name, nodegroup)
1707
    else:
1708
      nodegroup_obj.members.remove(node.name)
1709

    
1710
  @locking.ssynchronized(_config_lock)
1711
  def AssignGroupNodes(self, mods):
1712
    """Changes the group of a number of nodes.
1713

1714
    @type mods: list of tuples; (node name, new group UUID)
1715
    @param mods: Node membership modifications
1716

1717
    """
1718
    groups = self._config_data.nodegroups
1719
    nodes = self._config_data.nodes
1720

    
1721
    resmod = []
1722

    
1723
    # Try to resolve names/UUIDs first
1724
    for (node_name, new_group_uuid) in mods:
1725
      try:
1726
        node = nodes[node_name]
1727
      except KeyError:
1728
        raise errors.ConfigurationError("Unable to find node '%s'" % node_name)
1729

    
1730
      if node.group == new_group_uuid:
1731
        # Node is being assigned to its current group
1732
        logging.debug("Node '%s' was assigned to its current group (%s)",
1733
                      node_name, node.group)
1734
        continue
1735

    
1736
      # Try to find current group of node
1737
      try:
1738
        old_group = groups[node.group]
1739
      except KeyError:
1740
        raise errors.ConfigurationError("Unable to find old group '%s'" %
1741
                                        node.group)
1742

    
1743
      # Try to find new group for node
1744
      try:
1745
        new_group = groups[new_group_uuid]
1746
      except KeyError:
1747
        raise errors.ConfigurationError("Unable to find new group '%s'" %
1748
                                        new_group_uuid)
1749

    
1750
      assert node.name in old_group.members, \
1751
        ("Inconsistent configuration: node '%s' not listed in members for its"
1752
         " old group '%s'" % (node.name, old_group.uuid))
1753
      assert node.name not in new_group.members, \
1754
        ("Inconsistent configuration: node '%s' already listed in members for"
1755
         " its new group '%s'" % (node.name, new_group.uuid))
1756

    
1757
      resmod.append((node, old_group, new_group))
1758

    
1759
    # Apply changes
1760
    for (node, old_group, new_group) in resmod:
1761
      assert node.uuid != new_group.uuid and old_group.uuid != new_group.uuid, \
1762
        "Assigning to current group is not possible"
1763

    
1764
      node.group = new_group.uuid
1765

    
1766
      # Update members of involved groups
1767
      if node.name in old_group.members:
1768
        old_group.members.remove(node.name)
1769
      if node.name not in new_group.members:
1770
        new_group.members.append(node.name)
1771

    
1772
    # Update timestamps and serials (only once per node/group object)
1773
    now = time.time()
1774
    for obj in frozenset(itertools.chain(*resmod)): # pylint: disable=W0142
1775
      obj.serial_no += 1
1776
      obj.mtime = now
1777

    
1778
    # Force ssconf update
1779
    self._config_data.cluster.serial_no += 1
1780

    
1781
    self._WriteConfig()
1782

    
1783
  def _BumpSerialNo(self):
1784
    """Bump up the serial number of the config.
1785

1786
    """
1787
    self._config_data.serial_no += 1
1788
    self._config_data.mtime = time.time()
1789

    
1790
  def _AllUUIDObjects(self):
1791
    """Returns all objects with uuid attributes.
1792

1793
    """
1794
    return (self._config_data.instances.values() +
1795
            self._config_data.nodes.values() +
1796
            self._config_data.nodegroups.values() +
1797
            [self._config_data.cluster])
1798

    
1799
  def _OpenConfig(self, accept_foreign):
1800
    """Read the config data from disk.
1801

1802
    """
1803
    raw_data = utils.ReadFile(self._cfg_file)
1804

    
1805
    try:
1806
      data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1807
    except Exception, err:
1808
      raise errors.ConfigurationError(err)
1809

    
1810
    # Make sure the configuration has the right version
1811
    _ValidateConfig(data)
1812

    
1813
    if (not hasattr(data, 'cluster') or
1814
        not hasattr(data.cluster, 'rsahostkeypub')):
1815
      raise errors.ConfigurationError("Incomplete configuration"
1816
                                      " (missing cluster.rsahostkeypub)")
1817

    
1818
    if data.cluster.master_node != self._my_hostname and not accept_foreign:
1819
      msg = ("The configuration denotes node %s as master, while my"
1820
             " hostname is %s; opening a foreign configuration is only"
1821
             " possible in accept_foreign mode" %
1822
             (data.cluster.master_node, self._my_hostname))
1823
      raise errors.ConfigurationError(msg)
1824

    
1825
    # Upgrade configuration if needed
1826
    data.UpgradeConfig()
1827

    
1828
    self._config_data = data
1829
    # reset the last serial as -1 so that the next write will cause
1830
    # ssconf update
1831
    self._last_cluster_serial = -1
1832

    
1833
    # And finally run our (custom) config upgrade sequence
1834
    self._UpgradeConfig()
1835

    
1836
    self._cfg_id = utils.GetFileID(path=self._cfg_file)
1837

    
1838
  def _UpgradeConfig(self):
1839
    """Run upgrade steps that cannot be done purely in the objects.
1840

1841
    This is because some data elements need uniqueness across the
1842
    whole configuration, etc.
1843

1844
    @warning: this function will call L{_WriteConfig()}, but also
1845
        L{DropECReservations} so it needs to be called only from a
1846
        "safe" place (the constructor). If one wanted to call it with
1847
        the lock held, a DropECReservationUnlocked would need to be
1848
        created first, to avoid causing deadlock.
1849

1850
    """
1851
    modified = False
1852
    for item in self._AllUUIDObjects():
1853
      if item.uuid is None:
1854
        item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1855
        modified = True
1856
    if not self._config_data.nodegroups:
1857
      default_nodegroup_name = constants.INITIAL_NODE_GROUP_NAME
1858
      default_nodegroup = objects.NodeGroup(name=default_nodegroup_name,
1859
                                            members=[])
1860
      self._UnlockedAddNodeGroup(default_nodegroup, _UPGRADE_CONFIG_JID, True)
1861
      modified = True
1862
    for node in self._config_data.nodes.values():
1863
      if not node.group:
1864
        node.group = self.LookupNodeGroup(None)
1865
        modified = True
1866
      # This is technically *not* an upgrade, but needs to be done both when
1867
      # nodegroups are being added, and upon normally loading the config,
1868
      # because the members list of a node group is discarded upon
1869
      # serializing/deserializing the object.
1870
      self._UnlockedAddNodeToGroup(node.name, node.group)
1871
    if modified:
1872
      self._WriteConfig()
1873
      # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1874
      # only called at config init time, without the lock held
1875
      self.DropECReservations(_UPGRADE_CONFIG_JID)
1876

    
1877
  def _DistributeConfig(self, feedback_fn):
1878
    """Distribute the configuration to the other nodes.
1879

1880
    Currently, this only copies the configuration file. In the future,
1881
    it could be used to encapsulate the 2/3-phase update mechanism.
1882

1883
    """
1884
    if self._offline:
1885
      return True
1886

    
1887
    bad = False
1888

    
1889
    node_list = []
1890
    addr_list = []
1891
    myhostname = self._my_hostname
1892
    # we can skip checking whether _UnlockedGetNodeInfo returns None
1893
    # since the node list comes from _UnlocketGetNodeList, and we are
1894
    # called with the lock held, so no modifications should take place
1895
    # in between
1896
    for node_name in self._UnlockedGetNodeList():
1897
      if node_name == myhostname:
1898
        continue
1899
      node_info = self._UnlockedGetNodeInfo(node_name)
1900
      if not node_info.master_candidate:
1901
        continue
1902
      node_list.append(node_info.name)
1903
      addr_list.append(node_info.primary_ip)
1904

    
1905
    # TODO: Use dedicated resolver talking to config writer for name resolution
1906
    result = \
1907
      self._GetRpc(addr_list).call_upload_file(node_list, self._cfg_file)
1908
    for to_node, to_result in result.items():
1909
      msg = to_result.fail_msg
1910
      if msg:
1911
        msg = ("Copy of file %s to node %s failed: %s" %
1912
               (self._cfg_file, to_node, msg))
1913
        logging.error(msg)
1914

    
1915
        if feedback_fn:
1916
          feedback_fn(msg)
1917

    
1918
        bad = True
1919

    
1920
    return not bad
1921

    
1922
  def _WriteConfig(self, destination=None, feedback_fn=None):
1923
    """Write the configuration data to persistent storage.
1924

1925
    """
1926
    assert feedback_fn is None or callable(feedback_fn)
1927

    
1928
    # Warn on config errors, but don't abort the save - the
1929
    # configuration has already been modified, and we can't revert;
1930
    # the best we can do is to warn the user and save as is, leaving
1931
    # recovery to the user
1932
    config_errors = self._UnlockedVerifyConfig()
1933
    if config_errors:
1934
      errmsg = ("Configuration data is not consistent: %s" %
1935
                (utils.CommaJoin(config_errors)))
1936
      logging.critical(errmsg)
1937
      if feedback_fn:
1938
        feedback_fn(errmsg)
1939

    
1940
    if destination is None:
1941
      destination = self._cfg_file
1942
    self._BumpSerialNo()
1943
    txt = serializer.Dump(self._config_data.ToDict())
1944

    
1945
    getents = self._getents()
1946
    try:
1947
      fd = utils.SafeWriteFile(destination, self._cfg_id, data=txt,
1948
                               close=False, gid=getents.confd_gid, mode=0640)
1949
    except errors.LockError:
1950
      raise errors.ConfigurationError("The configuration file has been"
1951
                                      " modified since the last write, cannot"
1952
                                      " update")
1953
    try:
1954
      self._cfg_id = utils.GetFileID(fd=fd)
1955
    finally:
1956
      os.close(fd)
1957

    
1958
    self.write_count += 1
1959

    
1960
    # and redistribute the config file to master candidates
1961
    self._DistributeConfig(feedback_fn)
1962

    
1963
    # Write ssconf files on all nodes (including locally)
1964
    if self._last_cluster_serial < self._config_data.cluster.serial_no:
1965
      if not self._offline:
1966
        result = self._GetRpc(None).call_write_ssconf_files(
1967
          self._UnlockedGetOnlineNodeList(),
1968
          self._UnlockedGetSsconfValues())
1969

    
1970
        for nname, nresu in result.items():
1971
          msg = nresu.fail_msg
1972
          if msg:
1973
            errmsg = ("Error while uploading ssconf files to"
1974
                      " node %s: %s" % (nname, msg))
1975
            logging.warning(errmsg)
1976

    
1977
            if feedback_fn:
1978
              feedback_fn(errmsg)
1979

    
1980
      self._last_cluster_serial = self._config_data.cluster.serial_no
1981

    
1982
  def _UnlockedGetSsconfValues(self):
1983
    """Return the values needed by ssconf.
1984

1985
    @rtype: dict
1986
    @return: a dictionary with keys the ssconf names and values their
1987
        associated value
1988

1989
    """
1990
    fn = "\n".join
1991
    instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1992
    node_names = utils.NiceSort(self._UnlockedGetNodeList())
1993
    node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1994
    node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1995
                    for ninfo in node_info]
1996
    node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1997
                    for ninfo in node_info]
1998

    
1999
    instance_data = fn(instance_names)
2000
    off_data = fn(node.name for node in node_info if node.offline)
2001
    on_data = fn(node.name for node in node_info if not node.offline)
2002
    mc_data = fn(node.name for node in node_info if node.master_candidate)
2003
    mc_ips_data = fn(node.primary_ip for node in node_info
2004
                     if node.master_candidate)
2005
    node_data = fn(node_names)
2006
    node_pri_ips_data = fn(node_pri_ips)
2007
    node_snd_ips_data = fn(node_snd_ips)
2008

    
2009
    cluster = self._config_data.cluster
2010
    cluster_tags = fn(cluster.GetTags())
2011

    
2012
    hypervisor_list = fn(cluster.enabled_hypervisors)
2013

    
2014
    uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n")
2015

    
2016
    nodegroups = ["%s %s" % (nodegroup.uuid, nodegroup.name) for nodegroup in
2017
                  self._config_data.nodegroups.values()]
2018
    nodegroups_data = fn(utils.NiceSort(nodegroups))
2019

    
2020
    ssconf_values = {
2021
      constants.SS_CLUSTER_NAME: cluster.cluster_name,
2022
      constants.SS_CLUSTER_TAGS: cluster_tags,
2023
      constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
2024
      constants.SS_SHARED_FILE_STORAGE_DIR: cluster.shared_file_storage_dir,
2025
      constants.SS_MASTER_CANDIDATES: mc_data,
2026
      constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
2027
      constants.SS_MASTER_IP: cluster.master_ip,
2028
      constants.SS_MASTER_NETDEV: cluster.master_netdev,
2029
      constants.SS_MASTER_NETMASK: str(cluster.master_netmask),
2030
      constants.SS_MASTER_NODE: cluster.master_node,
2031
      constants.SS_NODE_LIST: node_data,
2032
      constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
2033
      constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
2034
      constants.SS_OFFLINE_NODES: off_data,
2035
      constants.SS_ONLINE_NODES: on_data,
2036
      constants.SS_PRIMARY_IP_FAMILY: str(cluster.primary_ip_family),
2037
      constants.SS_INSTANCE_LIST: instance_data,
2038
      constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
2039
      constants.SS_HYPERVISOR_LIST: hypervisor_list,
2040
      constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
2041
      constants.SS_UID_POOL: uid_pool,
2042
      constants.SS_NODEGROUPS: nodegroups_data,
2043
      }
2044
    bad_values = [(k, v) for k, v in ssconf_values.items()
2045
                  if not isinstance(v, (str, basestring))]
2046
    if bad_values:
2047
      err = utils.CommaJoin("%s=%s" % (k, v) for k, v in bad_values)
2048
      raise errors.ConfigurationError("Some ssconf key(s) have non-string"
2049
                                      " values: %s" % err)
2050
    return ssconf_values
2051

    
2052
  @locking.ssynchronized(_config_lock, shared=1)
2053
  def GetSsconfValues(self):
2054
    """Wrapper using lock around _UnlockedGetSsconf().
2055

2056
    """
2057
    return self._UnlockedGetSsconfValues()
2058

    
2059
  @locking.ssynchronized(_config_lock, shared=1)
2060
  def GetVGName(self):
2061
    """Return the volume group name.
2062

2063
    """
2064
    return self._config_data.cluster.volume_group_name
2065

    
2066
  @locking.ssynchronized(_config_lock)
2067
  def SetVGName(self, vg_name):
2068
    """Set the volume group name.
2069

2070
    """
2071
    self._config_data.cluster.volume_group_name = vg_name
2072
    self._config_data.cluster.serial_no += 1
2073
    self._WriteConfig()
2074

    
2075
  @locking.ssynchronized(_config_lock, shared=1)
2076
  def GetDRBDHelper(self):
2077
    """Return DRBD usermode helper.
2078

2079
    """
2080
    return self._config_data.cluster.drbd_usermode_helper
2081

    
2082
  @locking.ssynchronized(_config_lock)
2083
  def SetDRBDHelper(self, drbd_helper):
2084
    """Set DRBD usermode helper.
2085

2086
    """
2087
    self._config_data.cluster.drbd_usermode_helper = drbd_helper
2088
    self._config_data.cluster.serial_no += 1
2089
    self._WriteConfig()
2090

    
2091
  @locking.ssynchronized(_config_lock, shared=1)
2092
  def GetMACPrefix(self):
2093
    """Return the mac prefix.
2094

2095
    """
2096
    return self._config_data.cluster.mac_prefix
2097

    
2098
  @locking.ssynchronized(_config_lock, shared=1)
2099
  def GetClusterInfo(self):
2100
    """Returns information about the cluster
2101

2102
    @rtype: L{objects.Cluster}
2103
    @return: the cluster object
2104

2105
    """
2106
    return self._config_data.cluster
2107

    
2108
  @locking.ssynchronized(_config_lock, shared=1)
2109
  def HasAnyDiskOfType(self, dev_type):
2110
    """Check if in there is at disk of the given type in the configuration.
2111

2112
    """
2113
    return self._config_data.HasAnyDiskOfType(dev_type)
2114

    
2115
  @locking.ssynchronized(_config_lock)
2116
  def Update(self, target, feedback_fn):
2117
    """Notify function to be called after updates.
2118

2119
    This function must be called when an object (as returned by
2120
    GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
2121
    caller wants the modifications saved to the backing store. Note
2122
    that all modified objects will be saved, but the target argument
2123
    is the one the caller wants to ensure that it's saved.
2124

2125
    @param target: an instance of either L{objects.Cluster},
2126
        L{objects.Node} or L{objects.Instance} which is existing in
2127
        the cluster
2128
    @param feedback_fn: Callable feedback function
2129

2130
    """
2131
    if self._config_data is None:
2132
      raise errors.ProgrammerError("Configuration file not read,"
2133
                                   " cannot save.")
2134
    update_serial = False
2135
    if isinstance(target, objects.Cluster):
2136
      test = target == self._config_data.cluster
2137
    elif isinstance(target, objects.Node):
2138
      test = target in self._config_data.nodes.values()
2139
      update_serial = True
2140
    elif isinstance(target, objects.Instance):
2141
      test = target in self._config_data.instances.values()
2142
    elif isinstance(target, objects.NodeGroup):
2143
      test = target in self._config_data.nodegroups.values()
2144
    else:
2145
      raise errors.ProgrammerError("Invalid object type (%s) passed to"
2146
                                   " ConfigWriter.Update" % type(target))
2147
    if not test:
2148
      raise errors.ConfigurationError("Configuration updated since object"
2149
                                      " has been read or unknown object")
2150
    target.serial_no += 1
2151
    target.mtime = now = time.time()
2152

    
2153
    if update_serial:
2154
      # for node updates, we need to increase the cluster serial too
2155
      self._config_data.cluster.serial_no += 1
2156
      self._config_data.cluster.mtime = now
2157

    
2158
    if isinstance(target, objects.Instance):
2159
      self._UnlockedReleaseDRBDMinors(target.name)
2160

    
2161
    self._WriteConfig(feedback_fn=feedback_fn)
2162

    
2163
  @locking.ssynchronized(_config_lock)
2164
  def DropECReservations(self, ec_id):
2165
    """Drop per-execution-context reservations
2166

2167
    """
2168
    for rm in self._all_rms:
2169
      rm.DropECReservations(ec_id)