Statistics
| Branch: | Tag: | Revision:

root / lib / config.py @ 4b63dc7a

History | View | Annotate | Download (53.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Configuration management for Ganeti
23

24
This module provides the interface to the Ganeti cluster configuration.
25

26
The configuration data is stored on every node but is updated on the master
27
only. After each update, the master distributes the data to the other nodes.
28

29
Currently, the data storage format is JSON. YAML was slow and consuming too
30
much memory.
31

32
"""
33

    
34
# pylint: disable-msg=R0904
35
# R0904: Too many public methods
36

    
37
import os
38
import random
39
import logging
40
import time
41

    
42
from ganeti import errors
43
from ganeti import locking
44
from ganeti import utils
45
from ganeti import constants
46
from ganeti import rpc
47
from ganeti import objects
48
from ganeti import serializer
49
from ganeti import uidpool
50
from ganeti import netutils
51
from ganeti import runtime
52

    
53

    
54
_config_lock = locking.SharedLock("ConfigWriter")
55

    
56
# job id used for resource management at config upgrade time
57
_UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
58

    
59

    
60
def _ValidateConfig(data):
61
  """Verifies that a configuration objects looks valid.
62

63
  This only verifies the version of the configuration.
64

65
  @raise errors.ConfigurationError: if the version differs from what
66
      we expect
67

68
  """
69
  if data.version != constants.CONFIG_VERSION:
70
    raise errors.ConfigVersionMismatch(constants.CONFIG_VERSION, data.version)
71

    
72

    
73
class TemporaryReservationManager:
74
  """A temporary resource reservation manager.
75

76
  This is used to reserve resources in a job, before using them, making sure
77
  other jobs cannot get them in the meantime.
78

79
  """
80
  def __init__(self):
81
    self._ec_reserved = {}
82

    
83
  def Reserved(self, resource):
84
    for holder_reserved in self._ec_reserved.items():
85
      if resource in holder_reserved:
86
        return True
87
    return False
88

    
89
  def Reserve(self, ec_id, resource):
90
    if self.Reserved(resource):
91
      raise errors.ReservationError("Duplicate reservation for resource: %s." %
92
                                    (resource))
93
    if ec_id not in self._ec_reserved:
94
      self._ec_reserved[ec_id] = set([resource])
95
    else:
96
      self._ec_reserved[ec_id].add(resource)
97

    
98
  def DropECReservations(self, ec_id):
99
    if ec_id in self._ec_reserved:
100
      del self._ec_reserved[ec_id]
101

    
102
  def GetReserved(self):
103
    all_reserved = set()
104
    for holder_reserved in self._ec_reserved.values():
105
      all_reserved.update(holder_reserved)
106
    return all_reserved
107

    
108
  def Generate(self, existing, generate_one_fn, ec_id):
109
    """Generate a new resource of this type
110

111
    """
112
    assert callable(generate_one_fn)
113

    
114
    all_elems = self.GetReserved()
115
    all_elems.update(existing)
116
    retries = 64
117
    while retries > 0:
118
      new_resource = generate_one_fn()
119
      if new_resource is not None and new_resource not in all_elems:
120
        break
121
    else:
122
      raise errors.ConfigurationError("Not able generate new resource"
123
                                      " (last tried: %s)" % new_resource)
124
    self.Reserve(ec_id, new_resource)
125
    return new_resource
126

    
127

    
128
class ConfigWriter:
129
  """The interface to the cluster configuration.
130

131
  @ivar _temporary_lvs: reservation manager for temporary LVs
132
  @ivar _all_rms: a list of all temporary reservation managers
133

134
  """
135
  def __init__(self, cfg_file=None, offline=False, _getents=runtime.GetEnts):
136
    self.write_count = 0
137
    self._lock = _config_lock
138
    self._config_data = None
139
    self._offline = offline
140
    if cfg_file is None:
141
      self._cfg_file = constants.CLUSTER_CONF_FILE
142
    else:
143
      self._cfg_file = cfg_file
144
    self._getents = _getents
145
    self._temporary_ids = TemporaryReservationManager()
146
    self._temporary_drbds = {}
147
    self._temporary_macs = TemporaryReservationManager()
148
    self._temporary_secrets = TemporaryReservationManager()
149
    self._temporary_lvs = TemporaryReservationManager()
150
    self._all_rms = [self._temporary_ids, self._temporary_macs,
151
                     self._temporary_secrets, self._temporary_lvs]
152
    # Note: in order to prevent errors when resolving our name in
153
    # _DistributeConfig, we compute it here once and reuse it; it's
154
    # better to raise an error before starting to modify the config
155
    # file than after it was modified
156
    self._my_hostname = netutils.Hostname.GetSysName()
157
    self._last_cluster_serial = -1
158
    self._OpenConfig()
159

    
160
  # this method needs to be static, so that we can call it on the class
161
  @staticmethod
162
  def IsCluster():
163
    """Check if the cluster is configured.
164

165
    """
166
    return os.path.exists(constants.CLUSTER_CONF_FILE)
167

    
168
  def _GenerateOneMAC(self):
169
    """Generate one mac address
170

171
    """
172
    prefix = self._config_data.cluster.mac_prefix
173
    byte1 = random.randrange(0, 256)
174
    byte2 = random.randrange(0, 256)
175
    byte3 = random.randrange(0, 256)
176
    mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
177
    return mac
178

    
179
  @locking.ssynchronized(_config_lock, shared=1)
180
  def GenerateMAC(self, ec_id):
181
    """Generate a MAC for an instance.
182

183
    This should check the current instances for duplicates.
184

185
    """
186
    existing = self._AllMACs()
187
    return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
188

    
189
  @locking.ssynchronized(_config_lock, shared=1)
190
  def ReserveMAC(self, mac, ec_id):
191
    """Reserve a MAC for an instance.
192

193
    This only checks instances managed by this cluster, it does not
194
    check for potential collisions elsewhere.
195

196
    """
197
    all_macs = self._AllMACs()
198
    if mac in all_macs:
199
      raise errors.ReservationError("mac already in use")
200
    else:
201
      self._temporary_macs.Reserve(mac, ec_id)
202

    
203
  @locking.ssynchronized(_config_lock, shared=1)
204
  def ReserveLV(self, lv_name, ec_id):
205
    """Reserve an VG/LV pair for an instance.
206

207
    @type lv_name: string
208
    @param lv_name: the logical volume name to reserve
209

210
    """
211
    all_lvs = self._AllLVs()
212
    if lv_name in all_lvs:
213
      raise errors.ReservationError("LV already in use")
214
    else:
215
      self._temporary_lvs.Reserve(lv_name, ec_id)
216

    
217
  @locking.ssynchronized(_config_lock, shared=1)
218
  def GenerateDRBDSecret(self, ec_id):
219
    """Generate a DRBD secret.
220

221
    This checks the current disks for duplicates.
222

223
    """
224
    return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
225
                                            utils.GenerateSecret,
226
                                            ec_id)
227

    
228
  def _AllLVs(self):
229
    """Compute the list of all LVs.
230

231
    """
232
    lvnames = set()
233
    for instance in self._config_data.instances.values():
234
      node_data = instance.MapLVsByNode()
235
      for lv_list in node_data.values():
236
        lvnames.update(lv_list)
237
    return lvnames
238

    
239
  def _AllIDs(self, include_temporary):
240
    """Compute the list of all UUIDs and names we have.
241

242
    @type include_temporary: boolean
243
    @param include_temporary: whether to include the _temporary_ids set
244
    @rtype: set
245
    @return: a set of IDs
246

247
    """
248
    existing = set()
249
    if include_temporary:
250
      existing.update(self._temporary_ids.GetReserved())
251
    existing.update(self._AllLVs())
252
    existing.update(self._config_data.instances.keys())
253
    existing.update(self._config_data.nodes.keys())
254
    existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
255
    return existing
256

    
257
  def _GenerateUniqueID(self, ec_id):
258
    """Generate an unique UUID.
259

260
    This checks the current node, instances and disk names for
261
    duplicates.
262

263
    @rtype: string
264
    @return: the unique id
265

266
    """
267
    existing = self._AllIDs(include_temporary=False)
268
    return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
269

    
270
  @locking.ssynchronized(_config_lock, shared=1)
271
  def GenerateUniqueID(self, ec_id):
272
    """Generate an unique ID.
273

274
    This is just a wrapper over the unlocked version.
275

276
    @type ec_id: string
277
    @param ec_id: unique id for the job to reserve the id to
278

279
    """
280
    return self._GenerateUniqueID(ec_id)
281

    
282
  def _AllMACs(self):
283
    """Return all MACs present in the config.
284

285
    @rtype: list
286
    @return: the list of all MACs
287

288
    """
289
    result = []
290
    for instance in self._config_data.instances.values():
291
      for nic in instance.nics:
292
        result.append(nic.mac)
293

    
294
    return result
295

    
296
  def _AllDRBDSecrets(self):
297
    """Return all DRBD secrets present in the config.
298

299
    @rtype: list
300
    @return: the list of all DRBD secrets
301

302
    """
303
    def helper(disk, result):
304
      """Recursively gather secrets from this disk."""
305
      if disk.dev_type == constants.DT_DRBD8:
306
        result.append(disk.logical_id[5])
307
      if disk.children:
308
        for child in disk.children:
309
          helper(child, result)
310

    
311
    result = []
312
    for instance in self._config_data.instances.values():
313
      for disk in instance.disks:
314
        helper(disk, result)
315

    
316
    return result
317

    
318
  def _CheckDiskIDs(self, disk, l_ids, p_ids):
319
    """Compute duplicate disk IDs
320

321
    @type disk: L{objects.Disk}
322
    @param disk: the disk at which to start searching
323
    @type l_ids: list
324
    @param l_ids: list of current logical ids
325
    @type p_ids: list
326
    @param p_ids: list of current physical ids
327
    @rtype: list
328
    @return: a list of error messages
329

330
    """
331
    result = []
332
    if disk.logical_id is not None:
333
      if disk.logical_id in l_ids:
334
        result.append("duplicate logical id %s" % str(disk.logical_id))
335
      else:
336
        l_ids.append(disk.logical_id)
337
    if disk.physical_id is not None:
338
      if disk.physical_id in p_ids:
339
        result.append("duplicate physical id %s" % str(disk.physical_id))
340
      else:
341
        p_ids.append(disk.physical_id)
342

    
343
    if disk.children:
344
      for child in disk.children:
345
        result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
346
    return result
347

    
348
  def _UnlockedVerifyConfig(self):
349
    """Verify function.
350

351
    @rtype: list
352
    @return: a list of error messages; a non-empty list signifies
353
        configuration errors
354

355
    """
356
    result = []
357
    seen_macs = []
358
    ports = {}
359
    data = self._config_data
360
    seen_lids = []
361
    seen_pids = []
362

    
363
    # global cluster checks
364
    if not data.cluster.enabled_hypervisors:
365
      result.append("enabled hypervisors list doesn't have any entries")
366
    invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
367
    if invalid_hvs:
368
      result.append("enabled hypervisors contains invalid entries: %s" %
369
                    invalid_hvs)
370
    missing_hvp = (set(data.cluster.enabled_hypervisors) -
371
                   set(data.cluster.hvparams.keys()))
372
    if missing_hvp:
373
      result.append("hypervisor parameters missing for the enabled"
374
                    " hypervisor(s) %s" % utils.CommaJoin(missing_hvp))
375

    
376
    if data.cluster.master_node not in data.nodes:
377
      result.append("cluster has invalid primary node '%s'" %
378
                    data.cluster.master_node)
379

    
380
    # per-instance checks
381
    for instance_name in data.instances:
382
      instance = data.instances[instance_name]
383
      if instance.name != instance_name:
384
        result.append("instance '%s' is indexed by wrong name '%s'" %
385
                      (instance.name, instance_name))
386
      if instance.primary_node not in data.nodes:
387
        result.append("instance '%s' has invalid primary node '%s'" %
388
                      (instance_name, instance.primary_node))
389
      for snode in instance.secondary_nodes:
390
        if snode not in data.nodes:
391
          result.append("instance '%s' has invalid secondary node '%s'" %
392
                        (instance_name, snode))
393
      for idx, nic in enumerate(instance.nics):
394
        if nic.mac in seen_macs:
395
          result.append("instance '%s' has NIC %d mac %s duplicate" %
396
                        (instance_name, idx, nic.mac))
397
        else:
398
          seen_macs.append(nic.mac)
399

    
400
      # gather the drbd ports for duplicate checks
401
      for dsk in instance.disks:
402
        if dsk.dev_type in constants.LDS_DRBD:
403
          tcp_port = dsk.logical_id[2]
404
          if tcp_port not in ports:
405
            ports[tcp_port] = []
406
          ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
407
      # gather network port reservation
408
      net_port = getattr(instance, "network_port", None)
409
      if net_port is not None:
410
        if net_port not in ports:
411
          ports[net_port] = []
412
        ports[net_port].append((instance.name, "network port"))
413

    
414
      # instance disk verify
415
      for idx, disk in enumerate(instance.disks):
416
        result.extend(["instance '%s' disk %d error: %s" %
417
                       (instance.name, idx, msg) for msg in disk.Verify()])
418
        result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
419

    
420
    # cluster-wide pool of free ports
421
    for free_port in data.cluster.tcpudp_port_pool:
422
      if free_port not in ports:
423
        ports[free_port] = []
424
      ports[free_port].append(("cluster", "port marked as free"))
425

    
426
    # compute tcp/udp duplicate ports
427
    keys = ports.keys()
428
    keys.sort()
429
    for pnum in keys:
430
      pdata = ports[pnum]
431
      if len(pdata) > 1:
432
        txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
433
        result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
434

    
435
    # highest used tcp port check
436
    if keys:
437
      if keys[-1] > data.cluster.highest_used_port:
438
        result.append("Highest used port mismatch, saved %s, computed %s" %
439
                      (data.cluster.highest_used_port, keys[-1]))
440

    
441
    if not data.nodes[data.cluster.master_node].master_candidate:
442
      result.append("Master node is not a master candidate")
443

    
444
    # master candidate checks
445
    mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
446
    if mc_now < mc_max:
447
      result.append("Not enough master candidates: actual %d, target %d" %
448
                    (mc_now, mc_max))
449

    
450
    # node checks
451
    for node_name, node in data.nodes.items():
452
      if node.name != node_name:
453
        result.append("Node '%s' is indexed by wrong name '%s'" %
454
                      (node.name, node_name))
455
      if [node.master_candidate, node.drained, node.offline].count(True) > 1:
456
        result.append("Node %s state is invalid: master_candidate=%s,"
457
                      " drain=%s, offline=%s" %
458
                      (node.name, node.master_candidate, node.drain,
459
                       node.offline))
460

    
461
    # nodegroups checks
462
    nodegroups_names = set()
463
    for nodegroup_uuid in data.nodegroups:
464
      nodegroup = data.nodegroups[nodegroup_uuid]
465
      if nodegroup.uuid != nodegroup_uuid:
466
        result.append("nodegroup '%s' (uuid: '%s') indexed by wrong uuid '%s'"
467
                      % (nodegroup.name, nodegroup.uuid, nodegroup_uuid))
468
      if utils.UUID_RE.match(nodegroup.name.lower()):
469
        result.append("nodegroup '%s' (uuid: '%s') has uuid-like name" %
470
                      (nodegroup.name, nodegroup.uuid))
471
      if nodegroup.name in nodegroups_names:
472
        result.append("duplicate nodegroup name '%s'" % nodegroup.name)
473
      else:
474
        nodegroups_names.add(nodegroup.name)
475

    
476
    # drbd minors check
477
    _, duplicates = self._UnlockedComputeDRBDMap()
478
    for node, minor, instance_a, instance_b in duplicates:
479
      result.append("DRBD minor %d on node %s is assigned twice to instances"
480
                    " %s and %s" % (minor, node, instance_a, instance_b))
481

    
482
    # IP checks
483
    default_nicparams = data.cluster.nicparams[constants.PP_DEFAULT]
484
    ips = {}
485

    
486
    def _AddIpAddress(ip, name):
487
      ips.setdefault(ip, []).append(name)
488

    
489
    _AddIpAddress(data.cluster.master_ip, "cluster_ip")
490

    
491
    for node in data.nodes.values():
492
      _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
493
      if node.secondary_ip != node.primary_ip:
494
        _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
495

    
496
    for instance in data.instances.values():
497
      for idx, nic in enumerate(instance.nics):
498
        if nic.ip is None:
499
          continue
500

    
501
        nicparams = objects.FillDict(default_nicparams, nic.nicparams)
502
        nic_mode = nicparams[constants.NIC_MODE]
503
        nic_link = nicparams[constants.NIC_LINK]
504

    
505
        if nic_mode == constants.NIC_MODE_BRIDGED:
506
          link = "bridge:%s" % nic_link
507
        elif nic_mode == constants.NIC_MODE_ROUTED:
508
          link = "route:%s" % nic_link
509
        else:
510
          raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
511

    
512
        _AddIpAddress("%s/%s" % (link, nic.ip),
513
                      "instance:%s/nic:%d" % (instance.name, idx))
514

    
515
    for ip, owners in ips.items():
516
      if len(owners) > 1:
517
        result.append("IP address %s is used by multiple owners: %s" %
518
                      (ip, utils.CommaJoin(owners)))
519

    
520
    return result
521

    
522
  @locking.ssynchronized(_config_lock, shared=1)
523
  def VerifyConfig(self):
524
    """Verify function.
525

526
    This is just a wrapper over L{_UnlockedVerifyConfig}.
527

528
    @rtype: list
529
    @return: a list of error messages; a non-empty list signifies
530
        configuration errors
531

532
    """
533
    return self._UnlockedVerifyConfig()
534

    
535
  def _UnlockedSetDiskID(self, disk, node_name):
536
    """Convert the unique ID to the ID needed on the target nodes.
537

538
    This is used only for drbd, which needs ip/port configuration.
539

540
    The routine descends down and updates its children also, because
541
    this helps when the only the top device is passed to the remote
542
    node.
543

544
    This function is for internal use, when the config lock is already held.
545

546
    """
547
    if disk.children:
548
      for child in disk.children:
549
        self._UnlockedSetDiskID(child, node_name)
550

    
551
    if disk.logical_id is None and disk.physical_id is not None:
552
      return
553
    if disk.dev_type == constants.LD_DRBD8:
554
      pnode, snode, port, pminor, sminor, secret = disk.logical_id
555
      if node_name not in (pnode, snode):
556
        raise errors.ConfigurationError("DRBD device not knowing node %s" %
557
                                        node_name)
558
      pnode_info = self._UnlockedGetNodeInfo(pnode)
559
      snode_info = self._UnlockedGetNodeInfo(snode)
560
      if pnode_info is None or snode_info is None:
561
        raise errors.ConfigurationError("Can't find primary or secondary node"
562
                                        " for %s" % str(disk))
563
      p_data = (pnode_info.secondary_ip, port)
564
      s_data = (snode_info.secondary_ip, port)
565
      if pnode == node_name:
566
        disk.physical_id = p_data + s_data + (pminor, secret)
567
      else: # it must be secondary, we tested above
568
        disk.physical_id = s_data + p_data + (sminor, secret)
569
    else:
570
      disk.physical_id = disk.logical_id
571
    return
572

    
573
  @locking.ssynchronized(_config_lock)
574
  def SetDiskID(self, disk, node_name):
575
    """Convert the unique ID to the ID needed on the target nodes.
576

577
    This is used only for drbd, which needs ip/port configuration.
578

579
    The routine descends down and updates its children also, because
580
    this helps when the only the top device is passed to the remote
581
    node.
582

583
    """
584
    return self._UnlockedSetDiskID(disk, node_name)
585

    
586
  @locking.ssynchronized(_config_lock)
587
  def AddTcpUdpPort(self, port):
588
    """Adds a new port to the available port pool.
589

590
    """
591
    if not isinstance(port, int):
592
      raise errors.ProgrammerError("Invalid type passed for port")
593

    
594
    self._config_data.cluster.tcpudp_port_pool.add(port)
595
    self._WriteConfig()
596

    
597
  @locking.ssynchronized(_config_lock, shared=1)
598
  def GetPortList(self):
599
    """Returns a copy of the current port list.
600

601
    """
602
    return self._config_data.cluster.tcpudp_port_pool.copy()
603

    
604
  @locking.ssynchronized(_config_lock)
605
  def AllocatePort(self):
606
    """Allocate a port.
607

608
    The port will be taken from the available port pool or from the
609
    default port range (and in this case we increase
610
    highest_used_port).
611

612
    """
613
    # If there are TCP/IP ports configured, we use them first.
614
    if self._config_data.cluster.tcpudp_port_pool:
615
      port = self._config_data.cluster.tcpudp_port_pool.pop()
616
    else:
617
      port = self._config_data.cluster.highest_used_port + 1
618
      if port >= constants.LAST_DRBD_PORT:
619
        raise errors.ConfigurationError("The highest used port is greater"
620
                                        " than %s. Aborting." %
621
                                        constants.LAST_DRBD_PORT)
622
      self._config_data.cluster.highest_used_port = port
623

    
624
    self._WriteConfig()
625
    return port
626

    
627
  def _UnlockedComputeDRBDMap(self):
628
    """Compute the used DRBD minor/nodes.
629

630
    @rtype: (dict, list)
631
    @return: dictionary of node_name: dict of minor: instance_name;
632
        the returned dict will have all the nodes in it (even if with
633
        an empty list), and a list of duplicates; if the duplicates
634
        list is not empty, the configuration is corrupted and its caller
635
        should raise an exception
636

637
    """
638
    def _AppendUsedPorts(instance_name, disk, used):
639
      duplicates = []
640
      if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
641
        node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
642
        for node, port in ((node_a, minor_a), (node_b, minor_b)):
643
          assert node in used, ("Node '%s' of instance '%s' not found"
644
                                " in node list" % (node, instance_name))
645
          if port in used[node]:
646
            duplicates.append((node, port, instance_name, used[node][port]))
647
          else:
648
            used[node][port] = instance_name
649
      if disk.children:
650
        for child in disk.children:
651
          duplicates.extend(_AppendUsedPorts(instance_name, child, used))
652
      return duplicates
653

    
654
    duplicates = []
655
    my_dict = dict((node, {}) for node in self._config_data.nodes)
656
    for instance in self._config_data.instances.itervalues():
657
      for disk in instance.disks:
658
        duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
659
    for (node, minor), instance in self._temporary_drbds.iteritems():
660
      if minor in my_dict[node] and my_dict[node][minor] != instance:
661
        duplicates.append((node, minor, instance, my_dict[node][minor]))
662
      else:
663
        my_dict[node][minor] = instance
664
    return my_dict, duplicates
665

    
666
  @locking.ssynchronized(_config_lock)
667
  def ComputeDRBDMap(self):
668
    """Compute the used DRBD minor/nodes.
669

670
    This is just a wrapper over L{_UnlockedComputeDRBDMap}.
671

672
    @return: dictionary of node_name: dict of minor: instance_name;
673
        the returned dict will have all the nodes in it (even if with
674
        an empty list).
675

676
    """
677
    d_map, duplicates = self._UnlockedComputeDRBDMap()
678
    if duplicates:
679
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
680
                                      str(duplicates))
681
    return d_map
682

    
683
  @locking.ssynchronized(_config_lock)
684
  def AllocateDRBDMinor(self, nodes, instance):
685
    """Allocate a drbd minor.
686

687
    The free minor will be automatically computed from the existing
688
    devices. A node can be given multiple times in order to allocate
689
    multiple minors. The result is the list of minors, in the same
690
    order as the passed nodes.
691

692
    @type instance: string
693
    @param instance: the instance for which we allocate minors
694

695
    """
696
    assert isinstance(instance, basestring), \
697
           "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
698

    
699
    d_map, duplicates = self._UnlockedComputeDRBDMap()
700
    if duplicates:
701
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
702
                                      str(duplicates))
703
    result = []
704
    for nname in nodes:
705
      ndata = d_map[nname]
706
      if not ndata:
707
        # no minors used, we can start at 0
708
        result.append(0)
709
        ndata[0] = instance
710
        self._temporary_drbds[(nname, 0)] = instance
711
        continue
712
      keys = ndata.keys()
713
      keys.sort()
714
      ffree = utils.FirstFree(keys)
715
      if ffree is None:
716
        # return the next minor
717
        # TODO: implement high-limit check
718
        minor = keys[-1] + 1
719
      else:
720
        minor = ffree
721
      # double-check minor against current instances
722
      assert minor not in d_map[nname], \
723
             ("Attempt to reuse allocated DRBD minor %d on node %s,"
724
              " already allocated to instance %s" %
725
              (minor, nname, d_map[nname][minor]))
726
      ndata[minor] = instance
727
      # double-check minor against reservation
728
      r_key = (nname, minor)
729
      assert r_key not in self._temporary_drbds, \
730
             ("Attempt to reuse reserved DRBD minor %d on node %s,"
731
              " reserved for instance %s" %
732
              (minor, nname, self._temporary_drbds[r_key]))
733
      self._temporary_drbds[r_key] = instance
734
      result.append(minor)
735
    logging.debug("Request to allocate drbd minors, input: %s, returning %s",
736
                  nodes, result)
737
    return result
738

    
739
  def _UnlockedReleaseDRBDMinors(self, instance):
740
    """Release temporary drbd minors allocated for a given instance.
741

742
    @type instance: string
743
    @param instance: the instance for which temporary minors should be
744
                     released
745

746
    """
747
    assert isinstance(instance, basestring), \
748
           "Invalid argument passed to ReleaseDRBDMinors"
749
    for key, name in self._temporary_drbds.items():
750
      if name == instance:
751
        del self._temporary_drbds[key]
752

    
753
  @locking.ssynchronized(_config_lock)
754
  def ReleaseDRBDMinors(self, instance):
755
    """Release temporary drbd minors allocated for a given instance.
756

757
    This should be called on the error paths, on the success paths
758
    it's automatically called by the ConfigWriter add and update
759
    functions.
760

761
    This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
762

763
    @type instance: string
764
    @param instance: the instance for which temporary minors should be
765
                     released
766

767
    """
768
    self._UnlockedReleaseDRBDMinors(instance)
769

    
770
  @locking.ssynchronized(_config_lock, shared=1)
771
  def GetConfigVersion(self):
772
    """Get the configuration version.
773

774
    @return: Config version
775

776
    """
777
    return self._config_data.version
778

    
779
  @locking.ssynchronized(_config_lock, shared=1)
780
  def GetClusterName(self):
781
    """Get cluster name.
782

783
    @return: Cluster name
784

785
    """
786
    return self._config_data.cluster.cluster_name
787

    
788
  @locking.ssynchronized(_config_lock, shared=1)
789
  def GetMasterNode(self):
790
    """Get the hostname of the master node for this cluster.
791

792
    @return: Master hostname
793

794
    """
795
    return self._config_data.cluster.master_node
796

    
797
  @locking.ssynchronized(_config_lock, shared=1)
798
  def GetMasterIP(self):
799
    """Get the IP of the master node for this cluster.
800

801
    @return: Master IP
802

803
    """
804
    return self._config_data.cluster.master_ip
805

    
806
  @locking.ssynchronized(_config_lock, shared=1)
807
  def GetMasterNetdev(self):
808
    """Get the master network device for this cluster.
809

810
    """
811
    return self._config_data.cluster.master_netdev
812

    
813
  @locking.ssynchronized(_config_lock, shared=1)
814
  def GetFileStorageDir(self):
815
    """Get the file storage dir for this cluster.
816

817
    """
818
    return self._config_data.cluster.file_storage_dir
819

    
820
  @locking.ssynchronized(_config_lock, shared=1)
821
  def GetHypervisorType(self):
822
    """Get the hypervisor type for this cluster.
823

824
    """
825
    return self._config_data.cluster.enabled_hypervisors[0]
826

    
827
  @locking.ssynchronized(_config_lock, shared=1)
828
  def GetHostKey(self):
829
    """Return the rsa hostkey from the config.
830

831
    @rtype: string
832
    @return: the rsa hostkey
833

834
    """
835
    return self._config_data.cluster.rsahostkeypub
836

    
837
  @locking.ssynchronized(_config_lock, shared=1)
838
  def GetDefaultIAllocator(self):
839
    """Get the default instance allocator for this cluster.
840

841
    """
842
    return self._config_data.cluster.default_iallocator
843

    
844
  @locking.ssynchronized(_config_lock, shared=1)
845
  def GetPrimaryIPFamily(self):
846
    """Get cluster primary ip family.
847

848
    @return: primary ip family
849

850
    """
851
    return self._config_data.cluster.primary_ip_family
852

    
853
  @locking.ssynchronized(_config_lock, shared=1)
854
  def LookupNodeGroup(self, target):
855
    """Lookup a node group.
856

857
    @type target: string or None
858
    @param  target: group name or uuid or None to look for the default
859
    @rtype: string
860
    @return: nodegroup uuid
861
    @raises errors.OpPrereqError: when the target group cannot be found
862

863
    """
864
    if target is None:
865
      if len(self._config_data.nodegroups) != 1:
866
        raise errors.OpPrereqError("More than one nodegroup exists. Target"
867
                                   " group must be specified explicitely.")
868
      else:
869
        return self._config_data.nodegroups.keys()[0]
870
    if target in self._config_data.nodegroups:
871
      return target
872
    for nodegroup in self._config_data.nodegroups.values():
873
      if nodegroup.name == target:
874
        return nodegroup.uuid
875
    raise errors.OpPrereqError("Nodegroup '%s' not found", target)
876

    
877
  @locking.ssynchronized(_config_lock)
878
  def AddInstance(self, instance, ec_id):
879
    """Add an instance to the config.
880

881
    This should be used after creating a new instance.
882

883
    @type instance: L{objects.Instance}
884
    @param instance: the instance object
885

886
    """
887
    if not isinstance(instance, objects.Instance):
888
      raise errors.ProgrammerError("Invalid type passed to AddInstance")
889

    
890
    if instance.disk_template != constants.DT_DISKLESS:
891
      all_lvs = instance.MapLVsByNode()
892
      logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
893

    
894
    all_macs = self._AllMACs()
895
    for nic in instance.nics:
896
      if nic.mac in all_macs:
897
        raise errors.ConfigurationError("Cannot add instance %s:"
898
                                        " MAC address '%s' already in use." %
899
                                        (instance.name, nic.mac))
900

    
901
    self._EnsureUUID(instance, ec_id)
902

    
903
    instance.serial_no = 1
904
    instance.ctime = instance.mtime = time.time()
905
    self._config_data.instances[instance.name] = instance
906
    self._config_data.cluster.serial_no += 1
907
    self._UnlockedReleaseDRBDMinors(instance.name)
908
    self._WriteConfig()
909

    
910
  def _EnsureUUID(self, item, ec_id):
911
    """Ensures a given object has a valid UUID.
912

913
    @param item: the instance or node to be checked
914
    @param ec_id: the execution context id for the uuid reservation
915

916
    """
917
    if not item.uuid:
918
      item.uuid = self._GenerateUniqueID(ec_id)
919
    elif item.uuid in self._AllIDs(include_temporary=True):
920
      raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
921
                                      " in use" % (item.name, item.uuid))
922

    
923
  def _SetInstanceStatus(self, instance_name, status):
924
    """Set the instance's status to a given value.
925

926
    """
927
    assert isinstance(status, bool), \
928
           "Invalid status '%s' passed to SetInstanceStatus" % (status,)
929

    
930
    if instance_name not in self._config_data.instances:
931
      raise errors.ConfigurationError("Unknown instance '%s'" %
932
                                      instance_name)
933
    instance = self._config_data.instances[instance_name]
934
    if instance.admin_up != status:
935
      instance.admin_up = status
936
      instance.serial_no += 1
937
      instance.mtime = time.time()
938
      self._WriteConfig()
939

    
940
  @locking.ssynchronized(_config_lock)
941
  def MarkInstanceUp(self, instance_name):
942
    """Mark the instance status to up in the config.
943

944
    """
945
    self._SetInstanceStatus(instance_name, True)
946

    
947
  @locking.ssynchronized(_config_lock)
948
  def RemoveInstance(self, instance_name):
949
    """Remove the instance from the configuration.
950

951
    """
952
    if instance_name not in self._config_data.instances:
953
      raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
954
    del self._config_data.instances[instance_name]
955
    self._config_data.cluster.serial_no += 1
956
    self._WriteConfig()
957

    
958
  @locking.ssynchronized(_config_lock)
959
  def RenameInstance(self, old_name, new_name):
960
    """Rename an instance.
961

962
    This needs to be done in ConfigWriter and not by RemoveInstance
963
    combined with AddInstance as only we can guarantee an atomic
964
    rename.
965

966
    """
967
    if old_name not in self._config_data.instances:
968
      raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
969
    inst = self._config_data.instances[old_name]
970
    del self._config_data.instances[old_name]
971
    inst.name = new_name
972

    
973
    for disk in inst.disks:
974
      if disk.dev_type == constants.LD_FILE:
975
        # rename the file paths in logical and physical id
976
        file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
977
        disk.physical_id = disk.logical_id = (disk.logical_id[0],
978
                                              utils.PathJoin(file_storage_dir,
979
                                                             inst.name,
980
                                                             disk.iv_name))
981

    
982
    self._config_data.instances[inst.name] = inst
983
    self._WriteConfig()
984

    
985
  @locking.ssynchronized(_config_lock)
986
  def MarkInstanceDown(self, instance_name):
987
    """Mark the status of an instance to down in the configuration.
988

989
    """
990
    self._SetInstanceStatus(instance_name, False)
991

    
992
  def _UnlockedGetInstanceList(self):
993
    """Get the list of instances.
994

995
    This function is for internal use, when the config lock is already held.
996

997
    """
998
    return self._config_data.instances.keys()
999

    
1000
  @locking.ssynchronized(_config_lock, shared=1)
1001
  def GetInstanceList(self):
1002
    """Get the list of instances.
1003

1004
    @return: array of instances, ex. ['instance2.example.com',
1005
        'instance1.example.com']
1006

1007
    """
1008
    return self._UnlockedGetInstanceList()
1009

    
1010
  @locking.ssynchronized(_config_lock, shared=1)
1011
  def ExpandInstanceName(self, short_name):
1012
    """Attempt to expand an incomplete instance name.
1013

1014
    """
1015
    return utils.MatchNameComponent(short_name,
1016
                                    self._config_data.instances.keys(),
1017
                                    case_sensitive=False)
1018

    
1019
  def _UnlockedGetInstanceInfo(self, instance_name):
1020
    """Returns information about an instance.
1021

1022
    This function is for internal use, when the config lock is already held.
1023

1024
    """
1025
    if instance_name not in self._config_data.instances:
1026
      return None
1027

    
1028
    return self._config_data.instances[instance_name]
1029

    
1030
  @locking.ssynchronized(_config_lock, shared=1)
1031
  def GetInstanceInfo(self, instance_name):
1032
    """Returns information about an instance.
1033

1034
    It takes the information from the configuration file. Other information of
1035
    an instance are taken from the live systems.
1036

1037
    @param instance_name: name of the instance, e.g.
1038
        I{instance1.example.com}
1039

1040
    @rtype: L{objects.Instance}
1041
    @return: the instance object
1042

1043
    """
1044
    return self._UnlockedGetInstanceInfo(instance_name)
1045

    
1046
  @locking.ssynchronized(_config_lock, shared=1)
1047
  def GetAllInstancesInfo(self):
1048
    """Get the configuration of all instances.
1049

1050
    @rtype: dict
1051
    @return: dict of (instance, instance_info), where instance_info is what
1052
              would GetInstanceInfo return for the node
1053

1054
    """
1055
    my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
1056
                    for instance in self._UnlockedGetInstanceList()])
1057
    return my_dict
1058

    
1059
  @locking.ssynchronized(_config_lock)
1060
  def AddNode(self, node, ec_id):
1061
    """Add a node to the configuration.
1062

1063
    @type node: L{objects.Node}
1064
    @param node: a Node instance
1065

1066
    """
1067
    logging.info("Adding node %s to configuration", node.name)
1068

    
1069
    self._EnsureUUID(node, ec_id)
1070

    
1071
    node.serial_no = 1
1072
    node.ctime = node.mtime = time.time()
1073
    self._UnlockedAddNodeToGroup(node.name, node.nodegroup)
1074
    self._config_data.nodes[node.name] = node
1075
    self._config_data.cluster.serial_no += 1
1076
    self._WriteConfig()
1077

    
1078
  @locking.ssynchronized(_config_lock)
1079
  def RemoveNode(self, node_name):
1080
    """Remove a node from the configuration.
1081

1082
    """
1083
    logging.info("Removing node %s from configuration", node_name)
1084

    
1085
    if node_name not in self._config_data.nodes:
1086
      raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1087

    
1088
    self._UnlockedRemoveNodeFromGroup(self._config_data.nodes[node_name])
1089
    del self._config_data.nodes[node_name]
1090
    self._config_data.cluster.serial_no += 1
1091
    self._WriteConfig()
1092

    
1093
  @locking.ssynchronized(_config_lock, shared=1)
1094
  def ExpandNodeName(self, short_name):
1095
    """Attempt to expand an incomplete instance name.
1096

1097
    """
1098
    return utils.MatchNameComponent(short_name,
1099
                                    self._config_data.nodes.keys(),
1100
                                    case_sensitive=False)
1101

    
1102
  def _UnlockedGetNodeInfo(self, node_name):
1103
    """Get the configuration of a node, as stored in the config.
1104

1105
    This function is for internal use, when the config lock is already
1106
    held.
1107

1108
    @param node_name: the node name, e.g. I{node1.example.com}
1109

1110
    @rtype: L{objects.Node}
1111
    @return: the node object
1112

1113
    """
1114
    if node_name not in self._config_data.nodes:
1115
      return None
1116

    
1117
    return self._config_data.nodes[node_name]
1118

    
1119
  @locking.ssynchronized(_config_lock, shared=1)
1120
  def GetNodeInfo(self, node_name):
1121
    """Get the configuration of a node, as stored in the config.
1122

1123
    This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1124

1125
    @param node_name: the node name, e.g. I{node1.example.com}
1126

1127
    @rtype: L{objects.Node}
1128
    @return: the node object
1129

1130
    """
1131
    return self._UnlockedGetNodeInfo(node_name)
1132

    
1133
  def _UnlockedGetNodeList(self):
1134
    """Return the list of nodes which are in the configuration.
1135

1136
    This function is for internal use, when the config lock is already
1137
    held.
1138

1139
    @rtype: list
1140

1141
    """
1142
    return self._config_data.nodes.keys()
1143

    
1144
  @locking.ssynchronized(_config_lock, shared=1)
1145
  def GetNodeList(self):
1146
    """Return the list of nodes which are in the configuration.
1147

1148
    """
1149
    return self._UnlockedGetNodeList()
1150

    
1151
  def _UnlockedGetOnlineNodeList(self):
1152
    """Return the list of nodes which are online.
1153

1154
    """
1155
    all_nodes = [self._UnlockedGetNodeInfo(node)
1156
                 for node in self._UnlockedGetNodeList()]
1157
    return [node.name for node in all_nodes if not node.offline]
1158

    
1159
  @locking.ssynchronized(_config_lock, shared=1)
1160
  def GetOnlineNodeList(self):
1161
    """Return the list of nodes which are online.
1162

1163
    """
1164
    return self._UnlockedGetOnlineNodeList()
1165

    
1166
  @locking.ssynchronized(_config_lock, shared=1)
1167
  def GetAllNodesInfo(self):
1168
    """Get the configuration of all nodes.
1169

1170
    @rtype: dict
1171
    @return: dict of (node, node_info), where node_info is what
1172
              would GetNodeInfo return for the node
1173

1174
    """
1175
    my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
1176
                    for node in self._UnlockedGetNodeList()])
1177
    return my_dict
1178

    
1179
  def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1180
    """Get the number of current and maximum desired and possible candidates.
1181

1182
    @type exceptions: list
1183
    @param exceptions: if passed, list of nodes that should be ignored
1184
    @rtype: tuple
1185
    @return: tuple of (current, desired and possible, possible)
1186

1187
    """
1188
    mc_now = mc_should = mc_max = 0
1189
    for node in self._config_data.nodes.values():
1190
      if exceptions and node.name in exceptions:
1191
        continue
1192
      if not (node.offline or node.drained):
1193
        mc_max += 1
1194
      if node.master_candidate:
1195
        mc_now += 1
1196
    mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1197
    return (mc_now, mc_should, mc_max)
1198

    
1199
  @locking.ssynchronized(_config_lock, shared=1)
1200
  def GetMasterCandidateStats(self, exceptions=None):
1201
    """Get the number of current and maximum possible candidates.
1202

1203
    This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1204

1205
    @type exceptions: list
1206
    @param exceptions: if passed, list of nodes that should be ignored
1207
    @rtype: tuple
1208
    @return: tuple of (current, max)
1209

1210
    """
1211
    return self._UnlockedGetMasterCandidateStats(exceptions)
1212

    
1213
  @locking.ssynchronized(_config_lock)
1214
  def MaintainCandidatePool(self, exceptions):
1215
    """Try to grow the candidate pool to the desired size.
1216

1217
    @type exceptions: list
1218
    @param exceptions: if passed, list of nodes that should be ignored
1219
    @rtype: list
1220
    @return: list with the adjusted nodes (L{objects.Node} instances)
1221

1222
    """
1223
    mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1224
    mod_list = []
1225
    if mc_now < mc_max:
1226
      node_list = self._config_data.nodes.keys()
1227
      random.shuffle(node_list)
1228
      for name in node_list:
1229
        if mc_now >= mc_max:
1230
          break
1231
        node = self._config_data.nodes[name]
1232
        if (node.master_candidate or node.offline or node.drained or
1233
            node.name in exceptions):
1234
          continue
1235
        mod_list.append(node)
1236
        node.master_candidate = True
1237
        node.serial_no += 1
1238
        mc_now += 1
1239
      if mc_now != mc_max:
1240
        # this should not happen
1241
        logging.warning("Warning: MaintainCandidatePool didn't manage to"
1242
                        " fill the candidate pool (%d/%d)", mc_now, mc_max)
1243
      if mod_list:
1244
        self._config_data.cluster.serial_no += 1
1245
        self._WriteConfig()
1246

    
1247
    return mod_list
1248

    
1249
  def _UnlockedAddNodeToGroup(self, node_name, nodegroup_uuid):
1250
    """Add a given node to the specified group.
1251

1252
    """
1253
    if nodegroup_uuid not in self._config_data.nodegroups:
1254
      # This can happen if a node group gets deleted between its lookup and
1255
      # when we're adding the first node to it, since we don't keep a lock in
1256
      # the meantime. It's ok though, as we'll fail cleanly if the node group
1257
      # is not found anymore.
1258
      raise errors.OpExecError("Unknown nodegroup: %s" % nodegroup_uuid)
1259
    if node_name not in self._config_data.nodegroups[nodegroup_uuid].members:
1260
      self._config_data.nodegroups[nodegroup_uuid].members.append(node_name)
1261

    
1262
  def _UnlockedRemoveNodeFromGroup(self, node):
1263
    """Remove a given node from its group.
1264

1265
    """
1266
    nodegroup = node.nodegroup
1267
    if nodegroup not in self._config_data.nodegroups:
1268
      logging.warning("Warning: node '%s' has a non-existing nodegroup '%s'"
1269
                      " (while being removed from it)", node.name, nodegroup)
1270
    nodegroup_obj = self._config_data.nodegroups[nodegroup]
1271
    if node.name not in nodegroup_obj.members:
1272
      logging.warning("Warning: node '%s' not a member of its nodegroup '%s'"
1273
                      " (while being removed from it)", node.name, nodegroup)
1274
    else:
1275
      nodegroup_obj.members.remove(node.name)
1276

    
1277
  def _BumpSerialNo(self):
1278
    """Bump up the serial number of the config.
1279

1280
    """
1281
    self._config_data.serial_no += 1
1282
    self._config_data.mtime = time.time()
1283

    
1284
  def _AllUUIDObjects(self):
1285
    """Returns all objects with uuid attributes.
1286

1287
    """
1288
    return (self._config_data.instances.values() +
1289
            self._config_data.nodes.values() +
1290
            self._config_data.nodegroups.values() +
1291
            [self._config_data.cluster])
1292

    
1293
  def _OpenConfig(self):
1294
    """Read the config data from disk.
1295

1296
    """
1297
    raw_data = utils.ReadFile(self._cfg_file)
1298

    
1299
    try:
1300
      data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1301
    except Exception, err:
1302
      raise errors.ConfigurationError(err)
1303

    
1304
    # Make sure the configuration has the right version
1305
    _ValidateConfig(data)
1306

    
1307
    if (not hasattr(data, 'cluster') or
1308
        not hasattr(data.cluster, 'rsahostkeypub')):
1309
      raise errors.ConfigurationError("Incomplete configuration"
1310
                                      " (missing cluster.rsahostkeypub)")
1311

    
1312
    # Upgrade configuration if needed
1313
    data.UpgradeConfig()
1314

    
1315
    self._config_data = data
1316
    # reset the last serial as -1 so that the next write will cause
1317
    # ssconf update
1318
    self._last_cluster_serial = -1
1319

    
1320
    # And finally run our (custom) config upgrade sequence
1321
    self._UpgradeConfig()
1322

    
1323
  def _UpgradeConfig(self):
1324
    """Run upgrade steps that cannot be done purely in the objects.
1325

1326
    This is because some data elements need uniqueness across the
1327
    whole configuration, etc.
1328

1329
    @warning: this function will call L{_WriteConfig()}, but also
1330
        L{DropECReservations} so it needs to be called only from a
1331
        "safe" place (the constructor). If one wanted to call it with
1332
        the lock held, a DropECReservationUnlocked would need to be
1333
        created first, to avoid causing deadlock.
1334

1335
    """
1336
    modified = False
1337
    for item in self._AllUUIDObjects():
1338
      if item.uuid is None:
1339
        item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1340
        modified = True
1341
    if not self._config_data.nodegroups:
1342
      default_nodegroup_uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1343
      default_nodegroup = objects.NodeGroup(
1344
          uuid=default_nodegroup_uuid,
1345
          name="default",
1346
          members=[],
1347
          )
1348
      self._config_data.nodegroups[default_nodegroup_uuid] = default_nodegroup
1349
      modified = True
1350
    for node in self._config_data.nodes.values():
1351
      if not node.nodegroup:
1352
        node.nodegroup = self.LookupNodeGroup(None)
1353
        modified = True
1354
      # This is technically *not* an upgrade, but needs to be done both when
1355
      # nodegroups are being added, and upon normally loading the config,
1356
      # because the members list of a node group is discarded upon
1357
      # serializing/deserializing the object.
1358
      self._UnlockedAddNodeToGroup(node.name, node.nodegroup)
1359
    if modified:
1360
      self._WriteConfig()
1361
      # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1362
      # only called at config init time, without the lock held
1363
      self.DropECReservations(_UPGRADE_CONFIG_JID)
1364

    
1365
  def _DistributeConfig(self, feedback_fn):
1366
    """Distribute the configuration to the other nodes.
1367

1368
    Currently, this only copies the configuration file. In the future,
1369
    it could be used to encapsulate the 2/3-phase update mechanism.
1370

1371
    """
1372
    if self._offline:
1373
      return True
1374

    
1375
    bad = False
1376

    
1377
    node_list = []
1378
    addr_list = []
1379
    myhostname = self._my_hostname
1380
    # we can skip checking whether _UnlockedGetNodeInfo returns None
1381
    # since the node list comes from _UnlocketGetNodeList, and we are
1382
    # called with the lock held, so no modifications should take place
1383
    # in between
1384
    for node_name in self._UnlockedGetNodeList():
1385
      if node_name == myhostname:
1386
        continue
1387
      node_info = self._UnlockedGetNodeInfo(node_name)
1388
      if not node_info.master_candidate:
1389
        continue
1390
      node_list.append(node_info.name)
1391
      addr_list.append(node_info.primary_ip)
1392

    
1393
    result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1394
                                            address_list=addr_list)
1395
    for to_node, to_result in result.items():
1396
      msg = to_result.fail_msg
1397
      if msg:
1398
        msg = ("Copy of file %s to node %s failed: %s" %
1399
               (self._cfg_file, to_node, msg))
1400
        logging.error(msg)
1401

    
1402
        if feedback_fn:
1403
          feedback_fn(msg)
1404

    
1405
        bad = True
1406

    
1407
    return not bad
1408

    
1409
  def _WriteConfig(self, destination=None, feedback_fn=None):
1410
    """Write the configuration data to persistent storage.
1411

1412
    """
1413
    assert feedback_fn is None or callable(feedback_fn)
1414

    
1415
    # Warn on config errors, but don't abort the save - the
1416
    # configuration has already been modified, and we can't revert;
1417
    # the best we can do is to warn the user and save as is, leaving
1418
    # recovery to the user
1419
    config_errors = self._UnlockedVerifyConfig()
1420
    if config_errors:
1421
      errmsg = ("Configuration data is not consistent: %s" %
1422
                (utils.CommaJoin(config_errors)))
1423
      logging.critical(errmsg)
1424
      if feedback_fn:
1425
        feedback_fn(errmsg)
1426

    
1427
    if destination is None:
1428
      destination = self._cfg_file
1429
    self._BumpSerialNo()
1430
    txt = serializer.Dump(self._config_data.ToDict())
1431

    
1432
    getents = self._getents()
1433
    utils.WriteFile(destination, data=txt, gid=getents.confd_gid, mode=0640)
1434

    
1435
    self.write_count += 1
1436

    
1437
    # and redistribute the config file to master candidates
1438
    self._DistributeConfig(feedback_fn)
1439

    
1440
    # Write ssconf files on all nodes (including locally)
1441
    if self._last_cluster_serial < self._config_data.cluster.serial_no:
1442
      if not self._offline:
1443
        result = rpc.RpcRunner.call_write_ssconf_files(
1444
          self._UnlockedGetOnlineNodeList(),
1445
          self._UnlockedGetSsconfValues())
1446

    
1447
        for nname, nresu in result.items():
1448
          msg = nresu.fail_msg
1449
          if msg:
1450
            errmsg = ("Error while uploading ssconf files to"
1451
                      " node %s: %s" % (nname, msg))
1452
            logging.warning(errmsg)
1453

    
1454
            if feedback_fn:
1455
              feedback_fn(errmsg)
1456

    
1457
      self._last_cluster_serial = self._config_data.cluster.serial_no
1458

    
1459
  def _UnlockedGetSsconfValues(self):
1460
    """Return the values needed by ssconf.
1461

1462
    @rtype: dict
1463
    @return: a dictionary with keys the ssconf names and values their
1464
        associated value
1465

1466
    """
1467
    fn = "\n".join
1468
    instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1469
    node_names = utils.NiceSort(self._UnlockedGetNodeList())
1470
    node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1471
    node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1472
                    for ninfo in node_info]
1473
    node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1474
                    for ninfo in node_info]
1475

    
1476
    instance_data = fn(instance_names)
1477
    off_data = fn(node.name for node in node_info if node.offline)
1478
    on_data = fn(node.name for node in node_info if not node.offline)
1479
    mc_data = fn(node.name for node in node_info if node.master_candidate)
1480
    mc_ips_data = fn(node.primary_ip for node in node_info
1481
                     if node.master_candidate)
1482
    node_data = fn(node_names)
1483
    node_pri_ips_data = fn(node_pri_ips)
1484
    node_snd_ips_data = fn(node_snd_ips)
1485

    
1486
    cluster = self._config_data.cluster
1487
    cluster_tags = fn(cluster.GetTags())
1488

    
1489
    hypervisor_list = fn(cluster.enabled_hypervisors)
1490

    
1491
    uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n")
1492

    
1493
    nodegroups = ["%s %s" % (nodegroup.uuid, nodegroup.name) for nodegroup in
1494
                  self._config_data.nodegroups.values()]
1495
    nodegroups_data = fn(utils.NiceSort(nodegroups))
1496

    
1497
    return {
1498
      constants.SS_CLUSTER_NAME: cluster.cluster_name,
1499
      constants.SS_CLUSTER_TAGS: cluster_tags,
1500
      constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1501
      constants.SS_MASTER_CANDIDATES: mc_data,
1502
      constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1503
      constants.SS_MASTER_IP: cluster.master_ip,
1504
      constants.SS_MASTER_NETDEV: cluster.master_netdev,
1505
      constants.SS_MASTER_NODE: cluster.master_node,
1506
      constants.SS_NODE_LIST: node_data,
1507
      constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1508
      constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1509
      constants.SS_OFFLINE_NODES: off_data,
1510
      constants.SS_ONLINE_NODES: on_data,
1511
      constants.SS_PRIMARY_IP_FAMILY: str(cluster.primary_ip_family),
1512
      constants.SS_INSTANCE_LIST: instance_data,
1513
      constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1514
      constants.SS_HYPERVISOR_LIST: hypervisor_list,
1515
      constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
1516
      constants.SS_UID_POOL: uid_pool,
1517
      constants.SS_NODEGROUPS: nodegroups_data,
1518
      }
1519

    
1520
  @locking.ssynchronized(_config_lock, shared=1)
1521
  def GetSsconfValues(self):
1522
    """Wrapper using lock around _UnlockedGetSsconf().
1523

1524
    """
1525
    return self._UnlockedGetSsconfValues()
1526

    
1527
  @locking.ssynchronized(_config_lock, shared=1)
1528
  def GetVGName(self):
1529
    """Return the volume group name.
1530

1531
    """
1532
    return self._config_data.cluster.volume_group_name
1533

    
1534
  @locking.ssynchronized(_config_lock)
1535
  def SetVGName(self, vg_name):
1536
    """Set the volume group name.
1537

1538
    """
1539
    self._config_data.cluster.volume_group_name = vg_name
1540
    self._config_data.cluster.serial_no += 1
1541
    self._WriteConfig()
1542

    
1543
  @locking.ssynchronized(_config_lock, shared=1)
1544
  def GetDRBDHelper(self):
1545
    """Return DRBD usermode helper.
1546

1547
    """
1548
    return self._config_data.cluster.drbd_usermode_helper
1549

    
1550
  @locking.ssynchronized(_config_lock)
1551
  def SetDRBDHelper(self, drbd_helper):
1552
    """Set DRBD usermode helper.
1553

1554
    """
1555
    self._config_data.cluster.drbd_usermode_helper = drbd_helper
1556
    self._config_data.cluster.serial_no += 1
1557
    self._WriteConfig()
1558

    
1559
  @locking.ssynchronized(_config_lock, shared=1)
1560
  def GetMACPrefix(self):
1561
    """Return the mac prefix.
1562

1563
    """
1564
    return self._config_data.cluster.mac_prefix
1565

    
1566
  @locking.ssynchronized(_config_lock, shared=1)
1567
  def GetClusterInfo(self):
1568
    """Returns information about the cluster
1569

1570
    @rtype: L{objects.Cluster}
1571
    @return: the cluster object
1572

1573
    """
1574
    return self._config_data.cluster
1575

    
1576
  @locking.ssynchronized(_config_lock, shared=1)
1577
  def HasAnyDiskOfType(self, dev_type):
1578
    """Check if in there is at disk of the given type in the configuration.
1579

1580
    """
1581
    return self._config_data.HasAnyDiskOfType(dev_type)
1582

    
1583
  @locking.ssynchronized(_config_lock)
1584
  def Update(self, target, feedback_fn):
1585
    """Notify function to be called after updates.
1586

1587
    This function must be called when an object (as returned by
1588
    GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1589
    caller wants the modifications saved to the backing store. Note
1590
    that all modified objects will be saved, but the target argument
1591
    is the one the caller wants to ensure that it's saved.
1592

1593
    @param target: an instance of either L{objects.Cluster},
1594
        L{objects.Node} or L{objects.Instance} which is existing in
1595
        the cluster
1596
    @param feedback_fn: Callable feedback function
1597

1598
    """
1599
    if self._config_data is None:
1600
      raise errors.ProgrammerError("Configuration file not read,"
1601
                                   " cannot save.")
1602
    update_serial = False
1603
    if isinstance(target, objects.Cluster):
1604
      test = target == self._config_data.cluster
1605
    elif isinstance(target, objects.Node):
1606
      test = target in self._config_data.nodes.values()
1607
      update_serial = True
1608
    elif isinstance(target, objects.Instance):
1609
      test = target in self._config_data.instances.values()
1610
    else:
1611
      raise errors.ProgrammerError("Invalid object type (%s) passed to"
1612
                                   " ConfigWriter.Update" % type(target))
1613
    if not test:
1614
      raise errors.ConfigurationError("Configuration updated since object"
1615
                                      " has been read or unknown object")
1616
    target.serial_no += 1
1617
    target.mtime = now = time.time()
1618

    
1619
    if update_serial:
1620
      # for node updates, we need to increase the cluster serial too
1621
      self._config_data.cluster.serial_no += 1
1622
      self._config_data.cluster.mtime = now
1623

    
1624
    if isinstance(target, objects.Instance):
1625
      self._UnlockedReleaseDRBDMinors(target.name)
1626

    
1627
    self._WriteConfig(feedback_fn=feedback_fn)
1628

    
1629
  @locking.ssynchronized(_config_lock)
1630
  def DropECReservations(self, ec_id):
1631
    """Drop per-execution-context reservations
1632

1633
    """
1634
    for rm in self._all_rms:
1635
      rm.DropECReservations(ec_id)