Statistics
| Branch: | Tag: | Revision:

root / lib / config.py @ 6f076453

History | View | Annotate | Download (53.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Configuration management for Ganeti
23

24
This module provides the interface to the Ganeti cluster configuration.
25

26
The configuration data is stored on every node but is updated on the master
27
only. After each update, the master distributes the data to the other nodes.
28

29
Currently, the data storage format is JSON. YAML was slow and consuming too
30
much memory.
31

32
"""
33

    
34
# pylint: disable-msg=R0904
35
# R0904: Too many public methods
36

    
37
import os
38
import random
39
import logging
40
import time
41

    
42
from ganeti import errors
43
from ganeti import locking
44
from ganeti import utils
45
from ganeti import constants
46
from ganeti import rpc
47
from ganeti import objects
48
from ganeti import serializer
49
from ganeti import uidpool
50
from ganeti import netutils
51
from ganeti import runtime
52

    
53

    
54
_config_lock = locking.SharedLock("ConfigWriter")
55

    
56
# job id used for resource management at config upgrade time
57
_UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
58

    
59

    
60
def _ValidateConfig(data):
61
  """Verifies that a configuration objects looks valid.
62

63
  This only verifies the version of the configuration.
64

65
  @raise errors.ConfigurationError: if the version differs from what
66
      we expect
67

68
  """
69
  if data.version != constants.CONFIG_VERSION:
70
    raise errors.ConfigurationError("Cluster configuration version"
71
                                    " mismatch, got %s instead of %s" %
72
                                    (data.version,
73
                                     constants.CONFIG_VERSION))
74

    
75

    
76
class TemporaryReservationManager:
77
  """A temporary resource reservation manager.
78

79
  This is used to reserve resources in a job, before using them, making sure
80
  other jobs cannot get them in the meantime.
81

82
  """
83
  def __init__(self):
84
    self._ec_reserved = {}
85

    
86
  def Reserved(self, resource):
87
    for holder_reserved in self._ec_reserved.items():
88
      if resource in holder_reserved:
89
        return True
90
    return False
91

    
92
  def Reserve(self, ec_id, resource):
93
    if self.Reserved(resource):
94
      raise errors.ReservationError("Duplicate reservation for resource: %s." %
95
                                    (resource))
96
    if ec_id not in self._ec_reserved:
97
      self._ec_reserved[ec_id] = set([resource])
98
    else:
99
      self._ec_reserved[ec_id].add(resource)
100

    
101
  def DropECReservations(self, ec_id):
102
    if ec_id in self._ec_reserved:
103
      del self._ec_reserved[ec_id]
104

    
105
  def GetReserved(self):
106
    all_reserved = set()
107
    for holder_reserved in self._ec_reserved.values():
108
      all_reserved.update(holder_reserved)
109
    return all_reserved
110

    
111
  def Generate(self, existing, generate_one_fn, ec_id):
112
    """Generate a new resource of this type
113

114
    """
115
    assert callable(generate_one_fn)
116

    
117
    all_elems = self.GetReserved()
118
    all_elems.update(existing)
119
    retries = 64
120
    while retries > 0:
121
      new_resource = generate_one_fn()
122
      if new_resource is not None and new_resource not in all_elems:
123
        break
124
    else:
125
      raise errors.ConfigurationError("Not able generate new resource"
126
                                      " (last tried: %s)" % new_resource)
127
    self.Reserve(ec_id, new_resource)
128
    return new_resource
129

    
130

    
131
class ConfigWriter:
132
  """The interface to the cluster configuration.
133

134
  @ivar _temporary_lvs: reservation manager for temporary LVs
135
  @ivar _all_rms: a list of all temporary reservation managers
136

137
  """
138
  def __init__(self, cfg_file=None, offline=False, _getents=runtime.GetEnts):
139
    self.write_count = 0
140
    self._lock = _config_lock
141
    self._config_data = None
142
    self._offline = offline
143
    if cfg_file is None:
144
      self._cfg_file = constants.CLUSTER_CONF_FILE
145
    else:
146
      self._cfg_file = cfg_file
147
    self._getents = _getents
148
    self._temporary_ids = TemporaryReservationManager()
149
    self._temporary_drbds = {}
150
    self._temporary_macs = TemporaryReservationManager()
151
    self._temporary_secrets = TemporaryReservationManager()
152
    self._temporary_lvs = TemporaryReservationManager()
153
    self._all_rms = [self._temporary_ids, self._temporary_macs,
154
                     self._temporary_secrets, self._temporary_lvs]
155
    # Note: in order to prevent errors when resolving our name in
156
    # _DistributeConfig, we compute it here once and reuse it; it's
157
    # better to raise an error before starting to modify the config
158
    # file than after it was modified
159
    self._my_hostname = netutils.Hostname.GetSysName()
160
    self._last_cluster_serial = -1
161
    self._OpenConfig()
162

    
163
  # this method needs to be static, so that we can call it on the class
164
  @staticmethod
165
  def IsCluster():
166
    """Check if the cluster is configured.
167

168
    """
169
    return os.path.exists(constants.CLUSTER_CONF_FILE)
170

    
171
  def _GenerateOneMAC(self):
172
    """Generate one mac address
173

174
    """
175
    prefix = self._config_data.cluster.mac_prefix
176
    byte1 = random.randrange(0, 256)
177
    byte2 = random.randrange(0, 256)
178
    byte3 = random.randrange(0, 256)
179
    mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
180
    return mac
181

    
182
  @locking.ssynchronized(_config_lock, shared=1)
183
  def GenerateMAC(self, ec_id):
184
    """Generate a MAC for an instance.
185

186
    This should check the current instances for duplicates.
187

188
    """
189
    existing = self._AllMACs()
190
    return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
191

    
192
  @locking.ssynchronized(_config_lock, shared=1)
193
  def ReserveMAC(self, mac, ec_id):
194
    """Reserve a MAC for an instance.
195

196
    This only checks instances managed by this cluster, it does not
197
    check for potential collisions elsewhere.
198

199
    """
200
    all_macs = self._AllMACs()
201
    if mac in all_macs:
202
      raise errors.ReservationError("mac already in use")
203
    else:
204
      self._temporary_macs.Reserve(mac, ec_id)
205

    
206
  @locking.ssynchronized(_config_lock, shared=1)
207
  def ReserveLV(self, lv_name, ec_id):
208
    """Reserve an VG/LV pair for an instance.
209

210
    @type lv_name: string
211
    @param lv_name: the logical volume name to reserve
212

213
    """
214
    all_lvs = self._AllLVs()
215
    if lv_name in all_lvs:
216
      raise errors.ReservationError("LV already in use")
217
    else:
218
      self._temporary_lvs.Reserve(lv_name, ec_id)
219

    
220
  @locking.ssynchronized(_config_lock, shared=1)
221
  def GenerateDRBDSecret(self, ec_id):
222
    """Generate a DRBD secret.
223

224
    This checks the current disks for duplicates.
225

226
    """
227
    return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
228
                                            utils.GenerateSecret,
229
                                            ec_id)
230

    
231
  def _AllLVs(self):
232
    """Compute the list of all LVs.
233

234
    """
235
    lvnames = set()
236
    for instance in self._config_data.instances.values():
237
      node_data = instance.MapLVsByNode()
238
      for lv_list in node_data.values():
239
        lvnames.update(lv_list)
240
    return lvnames
241

    
242
  def _AllIDs(self, include_temporary):
243
    """Compute the list of all UUIDs and names we have.
244

245
    @type include_temporary: boolean
246
    @param include_temporary: whether to include the _temporary_ids set
247
    @rtype: set
248
    @return: a set of IDs
249

250
    """
251
    existing = set()
252
    if include_temporary:
253
      existing.update(self._temporary_ids.GetReserved())
254
    existing.update(self._AllLVs())
255
    existing.update(self._config_data.instances.keys())
256
    existing.update(self._config_data.nodes.keys())
257
    existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
258
    return existing
259

    
260
  def _GenerateUniqueID(self, ec_id):
261
    """Generate an unique UUID.
262

263
    This checks the current node, instances and disk names for
264
    duplicates.
265

266
    @rtype: string
267
    @return: the unique id
268

269
    """
270
    existing = self._AllIDs(include_temporary=False)
271
    return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
272

    
273
  @locking.ssynchronized(_config_lock, shared=1)
274
  def GenerateUniqueID(self, ec_id):
275
    """Generate an unique ID.
276

277
    This is just a wrapper over the unlocked version.
278

279
    @type ec_id: string
280
    @param ec_id: unique id for the job to reserve the id to
281

282
    """
283
    return self._GenerateUniqueID(ec_id)
284

    
285
  def _AllMACs(self):
286
    """Return all MACs present in the config.
287

288
    @rtype: list
289
    @return: the list of all MACs
290

291
    """
292
    result = []
293
    for instance in self._config_data.instances.values():
294
      for nic in instance.nics:
295
        result.append(nic.mac)
296

    
297
    return result
298

    
299
  def _AllDRBDSecrets(self):
300
    """Return all DRBD secrets present in the config.
301

302
    @rtype: list
303
    @return: the list of all DRBD secrets
304

305
    """
306
    def helper(disk, result):
307
      """Recursively gather secrets from this disk."""
308
      if disk.dev_type == constants.DT_DRBD8:
309
        result.append(disk.logical_id[5])
310
      if disk.children:
311
        for child in disk.children:
312
          helper(child, result)
313

    
314
    result = []
315
    for instance in self._config_data.instances.values():
316
      for disk in instance.disks:
317
        helper(disk, result)
318

    
319
    return result
320

    
321
  def _CheckDiskIDs(self, disk, l_ids, p_ids):
322
    """Compute duplicate disk IDs
323

324
    @type disk: L{objects.Disk}
325
    @param disk: the disk at which to start searching
326
    @type l_ids: list
327
    @param l_ids: list of current logical ids
328
    @type p_ids: list
329
    @param p_ids: list of current physical ids
330
    @rtype: list
331
    @return: a list of error messages
332

333
    """
334
    result = []
335
    if disk.logical_id is not None:
336
      if disk.logical_id in l_ids:
337
        result.append("duplicate logical id %s" % str(disk.logical_id))
338
      else:
339
        l_ids.append(disk.logical_id)
340
    if disk.physical_id is not None:
341
      if disk.physical_id in p_ids:
342
        result.append("duplicate physical id %s" % str(disk.physical_id))
343
      else:
344
        p_ids.append(disk.physical_id)
345

    
346
    if disk.children:
347
      for child in disk.children:
348
        result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
349
    return result
350

    
351
  def _UnlockedVerifyConfig(self):
352
    """Verify function.
353

354
    @rtype: list
355
    @return: a list of error messages; a non-empty list signifies
356
        configuration errors
357

358
    """
359
    result = []
360
    seen_macs = []
361
    ports = {}
362
    data = self._config_data
363
    seen_lids = []
364
    seen_pids = []
365

    
366
    # global cluster checks
367
    if not data.cluster.enabled_hypervisors:
368
      result.append("enabled hypervisors list doesn't have any entries")
369
    invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
370
    if invalid_hvs:
371
      result.append("enabled hypervisors contains invalid entries: %s" %
372
                    invalid_hvs)
373
    missing_hvp = (set(data.cluster.enabled_hypervisors) -
374
                   set(data.cluster.hvparams.keys()))
375
    if missing_hvp:
376
      result.append("hypervisor parameters missing for the enabled"
377
                    " hypervisor(s) %s" % utils.CommaJoin(missing_hvp))
378

    
379
    if data.cluster.master_node not in data.nodes:
380
      result.append("cluster has invalid primary node '%s'" %
381
                    data.cluster.master_node)
382

    
383
    # per-instance checks
384
    for instance_name in data.instances:
385
      instance = data.instances[instance_name]
386
      if instance.name != instance_name:
387
        result.append("instance '%s' is indexed by wrong name '%s'" %
388
                      (instance.name, instance_name))
389
      if instance.primary_node not in data.nodes:
390
        result.append("instance '%s' has invalid primary node '%s'" %
391
                      (instance_name, instance.primary_node))
392
      for snode in instance.secondary_nodes:
393
        if snode not in data.nodes:
394
          result.append("instance '%s' has invalid secondary node '%s'" %
395
                        (instance_name, snode))
396
      for idx, nic in enumerate(instance.nics):
397
        if nic.mac in seen_macs:
398
          result.append("instance '%s' has NIC %d mac %s duplicate" %
399
                        (instance_name, idx, nic.mac))
400
        else:
401
          seen_macs.append(nic.mac)
402

    
403
      # gather the drbd ports for duplicate checks
404
      for dsk in instance.disks:
405
        if dsk.dev_type in constants.LDS_DRBD:
406
          tcp_port = dsk.logical_id[2]
407
          if tcp_port not in ports:
408
            ports[tcp_port] = []
409
          ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
410
      # gather network port reservation
411
      net_port = getattr(instance, "network_port", None)
412
      if net_port is not None:
413
        if net_port not in ports:
414
          ports[net_port] = []
415
        ports[net_port].append((instance.name, "network port"))
416

    
417
      # instance disk verify
418
      for idx, disk in enumerate(instance.disks):
419
        result.extend(["instance '%s' disk %d error: %s" %
420
                       (instance.name, idx, msg) for msg in disk.Verify()])
421
        result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
422

    
423
    # cluster-wide pool of free ports
424
    for free_port in data.cluster.tcpudp_port_pool:
425
      if free_port not in ports:
426
        ports[free_port] = []
427
      ports[free_port].append(("cluster", "port marked as free"))
428

    
429
    # compute tcp/udp duplicate ports
430
    keys = ports.keys()
431
    keys.sort()
432
    for pnum in keys:
433
      pdata = ports[pnum]
434
      if len(pdata) > 1:
435
        txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
436
        result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
437

    
438
    # highest used tcp port check
439
    if keys:
440
      if keys[-1] > data.cluster.highest_used_port:
441
        result.append("Highest used port mismatch, saved %s, computed %s" %
442
                      (data.cluster.highest_used_port, keys[-1]))
443

    
444
    if not data.nodes[data.cluster.master_node].master_candidate:
445
      result.append("Master node is not a master candidate")
446

    
447
    # master candidate checks
448
    mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
449
    if mc_now < mc_max:
450
      result.append("Not enough master candidates: actual %d, target %d" %
451
                    (mc_now, mc_max))
452

    
453
    # node checks
454
    for node_name, node in data.nodes.items():
455
      if node.name != node_name:
456
        result.append("Node '%s' is indexed by wrong name '%s'" %
457
                      (node.name, node_name))
458
      if [node.master_candidate, node.drained, node.offline].count(True) > 1:
459
        result.append("Node %s state is invalid: master_candidate=%s,"
460
                      " drain=%s, offline=%s" %
461
                      (node.name, node.master_candidate, node.drain,
462
                       node.offline))
463

    
464
    # nodegroups checks
465
    for nodegroup_uuid in data.nodegroups:
466
      nodegroup = data.nodegroups[nodegroup_uuid]
467
      if nodegroup.uuid != nodegroup_uuid:
468
        result.append("nodegroup '%s' (uuid: '%s') indexed by wrong uuid '%s'"
469
                      % (nodegroup.name, nodegroup.uuid, nodegroup_uuid))
470

    
471
    # drbd minors check
472
    _, duplicates = self._UnlockedComputeDRBDMap()
473
    for node, minor, instance_a, instance_b in duplicates:
474
      result.append("DRBD minor %d on node %s is assigned twice to instances"
475
                    " %s and %s" % (minor, node, instance_a, instance_b))
476

    
477
    # IP checks
478
    default_nicparams = data.cluster.nicparams[constants.PP_DEFAULT]
479
    ips = {}
480

    
481
    def _AddIpAddress(ip, name):
482
      ips.setdefault(ip, []).append(name)
483

    
484
    _AddIpAddress(data.cluster.master_ip, "cluster_ip")
485

    
486
    for node in data.nodes.values():
487
      _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
488
      if node.secondary_ip != node.primary_ip:
489
        _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
490

    
491
    for instance in data.instances.values():
492
      for idx, nic in enumerate(instance.nics):
493
        if nic.ip is None:
494
          continue
495

    
496
        nicparams = objects.FillDict(default_nicparams, nic.nicparams)
497
        nic_mode = nicparams[constants.NIC_MODE]
498
        nic_link = nicparams[constants.NIC_LINK]
499

    
500
        if nic_mode == constants.NIC_MODE_BRIDGED:
501
          link = "bridge:%s" % nic_link
502
        elif nic_mode == constants.NIC_MODE_ROUTED:
503
          link = "route:%s" % nic_link
504
        else:
505
          raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
506

    
507
        _AddIpAddress("%s/%s" % (link, nic.ip),
508
                      "instance:%s/nic:%d" % (instance.name, idx))
509

    
510
    for ip, owners in ips.items():
511
      if len(owners) > 1:
512
        result.append("IP address %s is used by multiple owners: %s" %
513
                      (ip, utils.CommaJoin(owners)))
514

    
515
    return result
516

    
517
  @locking.ssynchronized(_config_lock, shared=1)
518
  def VerifyConfig(self):
519
    """Verify function.
520

521
    This is just a wrapper over L{_UnlockedVerifyConfig}.
522

523
    @rtype: list
524
    @return: a list of error messages; a non-empty list signifies
525
        configuration errors
526

527
    """
528
    return self._UnlockedVerifyConfig()
529

    
530
  def _UnlockedSetDiskID(self, disk, node_name):
531
    """Convert the unique ID to the ID needed on the target nodes.
532

533
    This is used only for drbd, which needs ip/port configuration.
534

535
    The routine descends down and updates its children also, because
536
    this helps when the only the top device is passed to the remote
537
    node.
538

539
    This function is for internal use, when the config lock is already held.
540

541
    """
542
    if disk.children:
543
      for child in disk.children:
544
        self._UnlockedSetDiskID(child, node_name)
545

    
546
    if disk.logical_id is None and disk.physical_id is not None:
547
      return
548
    if disk.dev_type == constants.LD_DRBD8:
549
      pnode, snode, port, pminor, sminor, secret = disk.logical_id
550
      if node_name not in (pnode, snode):
551
        raise errors.ConfigurationError("DRBD device not knowing node %s" %
552
                                        node_name)
553
      pnode_info = self._UnlockedGetNodeInfo(pnode)
554
      snode_info = self._UnlockedGetNodeInfo(snode)
555
      if pnode_info is None or snode_info is None:
556
        raise errors.ConfigurationError("Can't find primary or secondary node"
557
                                        " for %s" % str(disk))
558
      p_data = (pnode_info.secondary_ip, port)
559
      s_data = (snode_info.secondary_ip, port)
560
      if pnode == node_name:
561
        disk.physical_id = p_data + s_data + (pminor, secret)
562
      else: # it must be secondary, we tested above
563
        disk.physical_id = s_data + p_data + (sminor, secret)
564
    else:
565
      disk.physical_id = disk.logical_id
566
    return
567

    
568
  @locking.ssynchronized(_config_lock)
569
  def SetDiskID(self, disk, node_name):
570
    """Convert the unique ID to the ID needed on the target nodes.
571

572
    This is used only for drbd, which needs ip/port configuration.
573

574
    The routine descends down and updates its children also, because
575
    this helps when the only the top device is passed to the remote
576
    node.
577

578
    """
579
    return self._UnlockedSetDiskID(disk, node_name)
580

    
581
  @locking.ssynchronized(_config_lock)
582
  def AddTcpUdpPort(self, port):
583
    """Adds a new port to the available port pool.
584

585
    """
586
    if not isinstance(port, int):
587
      raise errors.ProgrammerError("Invalid type passed for port")
588

    
589
    self._config_data.cluster.tcpudp_port_pool.add(port)
590
    self._WriteConfig()
591

    
592
  @locking.ssynchronized(_config_lock, shared=1)
593
  def GetPortList(self):
594
    """Returns a copy of the current port list.
595

596
    """
597
    return self._config_data.cluster.tcpudp_port_pool.copy()
598

    
599
  @locking.ssynchronized(_config_lock)
600
  def AllocatePort(self):
601
    """Allocate a port.
602

603
    The port will be taken from the available port pool or from the
604
    default port range (and in this case we increase
605
    highest_used_port).
606

607
    """
608
    # If there are TCP/IP ports configured, we use them first.
609
    if self._config_data.cluster.tcpudp_port_pool:
610
      port = self._config_data.cluster.tcpudp_port_pool.pop()
611
    else:
612
      port = self._config_data.cluster.highest_used_port + 1
613
      if port >= constants.LAST_DRBD_PORT:
614
        raise errors.ConfigurationError("The highest used port is greater"
615
                                        " than %s. Aborting." %
616
                                        constants.LAST_DRBD_PORT)
617
      self._config_data.cluster.highest_used_port = port
618

    
619
    self._WriteConfig()
620
    return port
621

    
622
  def _UnlockedComputeDRBDMap(self):
623
    """Compute the used DRBD minor/nodes.
624

625
    @rtype: (dict, list)
626
    @return: dictionary of node_name: dict of minor: instance_name;
627
        the returned dict will have all the nodes in it (even if with
628
        an empty list), and a list of duplicates; if the duplicates
629
        list is not empty, the configuration is corrupted and its caller
630
        should raise an exception
631

632
    """
633
    def _AppendUsedPorts(instance_name, disk, used):
634
      duplicates = []
635
      if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
636
        node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
637
        for node, port in ((node_a, minor_a), (node_b, minor_b)):
638
          assert node in used, ("Node '%s' of instance '%s' not found"
639
                                " in node list" % (node, instance_name))
640
          if port in used[node]:
641
            duplicates.append((node, port, instance_name, used[node][port]))
642
          else:
643
            used[node][port] = instance_name
644
      if disk.children:
645
        for child in disk.children:
646
          duplicates.extend(_AppendUsedPorts(instance_name, child, used))
647
      return duplicates
648

    
649
    duplicates = []
650
    my_dict = dict((node, {}) for node in self._config_data.nodes)
651
    for instance in self._config_data.instances.itervalues():
652
      for disk in instance.disks:
653
        duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
654
    for (node, minor), instance in self._temporary_drbds.iteritems():
655
      if minor in my_dict[node] and my_dict[node][minor] != instance:
656
        duplicates.append((node, minor, instance, my_dict[node][minor]))
657
      else:
658
        my_dict[node][minor] = instance
659
    return my_dict, duplicates
660

    
661
  @locking.ssynchronized(_config_lock)
662
  def ComputeDRBDMap(self):
663
    """Compute the used DRBD minor/nodes.
664

665
    This is just a wrapper over L{_UnlockedComputeDRBDMap}.
666

667
    @return: dictionary of node_name: dict of minor: instance_name;
668
        the returned dict will have all the nodes in it (even if with
669
        an empty list).
670

671
    """
672
    d_map, duplicates = self._UnlockedComputeDRBDMap()
673
    if duplicates:
674
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
675
                                      str(duplicates))
676
    return d_map
677

    
678
  @locking.ssynchronized(_config_lock)
679
  def AllocateDRBDMinor(self, nodes, instance):
680
    """Allocate a drbd minor.
681

682
    The free minor will be automatically computed from the existing
683
    devices. A node can be given multiple times in order to allocate
684
    multiple minors. The result is the list of minors, in the same
685
    order as the passed nodes.
686

687
    @type instance: string
688
    @param instance: the instance for which we allocate minors
689

690
    """
691
    assert isinstance(instance, basestring), \
692
           "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
693

    
694
    d_map, duplicates = self._UnlockedComputeDRBDMap()
695
    if duplicates:
696
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
697
                                      str(duplicates))
698
    result = []
699
    for nname in nodes:
700
      ndata = d_map[nname]
701
      if not ndata:
702
        # no minors used, we can start at 0
703
        result.append(0)
704
        ndata[0] = instance
705
        self._temporary_drbds[(nname, 0)] = instance
706
        continue
707
      keys = ndata.keys()
708
      keys.sort()
709
      ffree = utils.FirstFree(keys)
710
      if ffree is None:
711
        # return the next minor
712
        # TODO: implement high-limit check
713
        minor = keys[-1] + 1
714
      else:
715
        minor = ffree
716
      # double-check minor against current instances
717
      assert minor not in d_map[nname], \
718
             ("Attempt to reuse allocated DRBD minor %d on node %s,"
719
              " already allocated to instance %s" %
720
              (minor, nname, d_map[nname][minor]))
721
      ndata[minor] = instance
722
      # double-check minor against reservation
723
      r_key = (nname, minor)
724
      assert r_key not in self._temporary_drbds, \
725
             ("Attempt to reuse reserved DRBD minor %d on node %s,"
726
              " reserved for instance %s" %
727
              (minor, nname, self._temporary_drbds[r_key]))
728
      self._temporary_drbds[r_key] = instance
729
      result.append(minor)
730
    logging.debug("Request to allocate drbd minors, input: %s, returning %s",
731
                  nodes, result)
732
    return result
733

    
734
  def _UnlockedReleaseDRBDMinors(self, instance):
735
    """Release temporary drbd minors allocated for a given instance.
736

737
    @type instance: string
738
    @param instance: the instance for which temporary minors should be
739
                     released
740

741
    """
742
    assert isinstance(instance, basestring), \
743
           "Invalid argument passed to ReleaseDRBDMinors"
744
    for key, name in self._temporary_drbds.items():
745
      if name == instance:
746
        del self._temporary_drbds[key]
747

    
748
  @locking.ssynchronized(_config_lock)
749
  def ReleaseDRBDMinors(self, instance):
750
    """Release temporary drbd minors allocated for a given instance.
751

752
    This should be called on the error paths, on the success paths
753
    it's automatically called by the ConfigWriter add and update
754
    functions.
755

756
    This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
757

758
    @type instance: string
759
    @param instance: the instance for which temporary minors should be
760
                     released
761

762
    """
763
    self._UnlockedReleaseDRBDMinors(instance)
764

    
765
  @locking.ssynchronized(_config_lock, shared=1)
766
  def GetConfigVersion(self):
767
    """Get the configuration version.
768

769
    @return: Config version
770

771
    """
772
    return self._config_data.version
773

    
774
  @locking.ssynchronized(_config_lock, shared=1)
775
  def GetClusterName(self):
776
    """Get cluster name.
777

778
    @return: Cluster name
779

780
    """
781
    return self._config_data.cluster.cluster_name
782

    
783
  @locking.ssynchronized(_config_lock, shared=1)
784
  def GetMasterNode(self):
785
    """Get the hostname of the master node for this cluster.
786

787
    @return: Master hostname
788

789
    """
790
    return self._config_data.cluster.master_node
791

    
792
  @locking.ssynchronized(_config_lock, shared=1)
793
  def GetMasterIP(self):
794
    """Get the IP of the master node for this cluster.
795

796
    @return: Master IP
797

798
    """
799
    return self._config_data.cluster.master_ip
800

    
801
  @locking.ssynchronized(_config_lock, shared=1)
802
  def GetMasterNetdev(self):
803
    """Get the master network device for this cluster.
804

805
    """
806
    return self._config_data.cluster.master_netdev
807

    
808
  @locking.ssynchronized(_config_lock, shared=1)
809
  def GetFileStorageDir(self):
810
    """Get the file storage dir for this cluster.
811

812
    """
813
    return self._config_data.cluster.file_storage_dir
814

    
815
  @locking.ssynchronized(_config_lock, shared=1)
816
  def GetHypervisorType(self):
817
    """Get the hypervisor type for this cluster.
818

819
    """
820
    return self._config_data.cluster.enabled_hypervisors[0]
821

    
822
  @locking.ssynchronized(_config_lock, shared=1)
823
  def GetHostKey(self):
824
    """Return the rsa hostkey from the config.
825

826
    @rtype: string
827
    @return: the rsa hostkey
828

829
    """
830
    return self._config_data.cluster.rsahostkeypub
831

    
832
  @locking.ssynchronized(_config_lock, shared=1)
833
  def GetDefaultIAllocator(self):
834
    """Get the default instance allocator for this cluster.
835

836
    """
837
    return self._config_data.cluster.default_iallocator
838

    
839
  @locking.ssynchronized(_config_lock, shared=1)
840
  def GetPrimaryIPFamily(self):
841
    """Get cluster primary ip family.
842

843
    @return: primary ip family
844

845
    """
846
    return self._config_data.cluster.primary_ip_family
847

    
848
  @locking.ssynchronized(_config_lock, shared=1)
849
  def LookupNodeGroup(self, target):
850
    """Lookup a node group.
851

852
    @type target: string or None
853
    @param  target: group name or uuid or None to look for the default
854
    @rtype: string
855
    @return: nodegroup uuid
856
    @raises errors.OpPrereqError: when the target group cannot be found
857

858
    """
859
    if target is None:
860
      if len(self._config_data.nodegroups) != 1:
861
        raise errors.OpPrereqError("More than one nodegroup exists. Target"
862
                                   " group must be specified explicitely.")
863
      else:
864
        return self._config_data.nodegroups.keys()[0]
865
    if target in self._config_data.nodegroups:
866
      return target
867
    for nodegroup in self._config_data.nodegroups.values():
868
      if nodegroup.name == target:
869
        return nodegroup.uuid
870
    raise errors.OpPrereqError("Nodegroup '%s' not found", target)
871

    
872
  @locking.ssynchronized(_config_lock)
873
  def AddInstance(self, instance, ec_id):
874
    """Add an instance to the config.
875

876
    This should be used after creating a new instance.
877

878
    @type instance: L{objects.Instance}
879
    @param instance: the instance object
880

881
    """
882
    if not isinstance(instance, objects.Instance):
883
      raise errors.ProgrammerError("Invalid type passed to AddInstance")
884

    
885
    if instance.disk_template != constants.DT_DISKLESS:
886
      all_lvs = instance.MapLVsByNode()
887
      logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
888

    
889
    all_macs = self._AllMACs()
890
    for nic in instance.nics:
891
      if nic.mac in all_macs:
892
        raise errors.ConfigurationError("Cannot add instance %s:"
893
                                        " MAC address '%s' already in use." %
894
                                        (instance.name, nic.mac))
895

    
896
    self._EnsureUUID(instance, ec_id)
897

    
898
    instance.serial_no = 1
899
    instance.ctime = instance.mtime = time.time()
900
    self._config_data.instances[instance.name] = instance
901
    self._config_data.cluster.serial_no += 1
902
    self._UnlockedReleaseDRBDMinors(instance.name)
903
    self._WriteConfig()
904

    
905
  def _EnsureUUID(self, item, ec_id):
906
    """Ensures a given object has a valid UUID.
907

908
    @param item: the instance or node to be checked
909
    @param ec_id: the execution context id for the uuid reservation
910

911
    """
912
    if not item.uuid:
913
      item.uuid = self._GenerateUniqueID(ec_id)
914
    elif item.uuid in self._AllIDs(include_temporary=True):
915
      raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
916
                                      " in use" % (item.name, item.uuid))
917

    
918
  def _SetInstanceStatus(self, instance_name, status):
919
    """Set the instance's status to a given value.
920

921
    """
922
    assert isinstance(status, bool), \
923
           "Invalid status '%s' passed to SetInstanceStatus" % (status,)
924

    
925
    if instance_name not in self._config_data.instances:
926
      raise errors.ConfigurationError("Unknown instance '%s'" %
927
                                      instance_name)
928
    instance = self._config_data.instances[instance_name]
929
    if instance.admin_up != status:
930
      instance.admin_up = status
931
      instance.serial_no += 1
932
      instance.mtime = time.time()
933
      self._WriteConfig()
934

    
935
  @locking.ssynchronized(_config_lock)
936
  def MarkInstanceUp(self, instance_name):
937
    """Mark the instance status to up in the config.
938

939
    """
940
    self._SetInstanceStatus(instance_name, True)
941

    
942
  @locking.ssynchronized(_config_lock)
943
  def RemoveInstance(self, instance_name):
944
    """Remove the instance from the configuration.
945

946
    """
947
    if instance_name not in self._config_data.instances:
948
      raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
949
    del self._config_data.instances[instance_name]
950
    self._config_data.cluster.serial_no += 1
951
    self._WriteConfig()
952

    
953
  @locking.ssynchronized(_config_lock)
954
  def RenameInstance(self, old_name, new_name):
955
    """Rename an instance.
956

957
    This needs to be done in ConfigWriter and not by RemoveInstance
958
    combined with AddInstance as only we can guarantee an atomic
959
    rename.
960

961
    """
962
    if old_name not in self._config_data.instances:
963
      raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
964
    inst = self._config_data.instances[old_name]
965
    del self._config_data.instances[old_name]
966
    inst.name = new_name
967

    
968
    for disk in inst.disks:
969
      if disk.dev_type == constants.LD_FILE:
970
        # rename the file paths in logical and physical id
971
        file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
972
        disk.physical_id = disk.logical_id = (disk.logical_id[0],
973
                                              utils.PathJoin(file_storage_dir,
974
                                                             inst.name,
975
                                                             disk.iv_name))
976

    
977
    self._config_data.instances[inst.name] = inst
978
    self._WriteConfig()
979

    
980
  @locking.ssynchronized(_config_lock)
981
  def MarkInstanceDown(self, instance_name):
982
    """Mark the status of an instance to down in the configuration.
983

984
    """
985
    self._SetInstanceStatus(instance_name, False)
986

    
987
  def _UnlockedGetInstanceList(self):
988
    """Get the list of instances.
989

990
    This function is for internal use, when the config lock is already held.
991

992
    """
993
    return self._config_data.instances.keys()
994

    
995
  @locking.ssynchronized(_config_lock, shared=1)
996
  def GetInstanceList(self):
997
    """Get the list of instances.
998

999
    @return: array of instances, ex. ['instance2.example.com',
1000
        'instance1.example.com']
1001

1002
    """
1003
    return self._UnlockedGetInstanceList()
1004

    
1005
  @locking.ssynchronized(_config_lock, shared=1)
1006
  def ExpandInstanceName(self, short_name):
1007
    """Attempt to expand an incomplete instance name.
1008

1009
    """
1010
    return utils.MatchNameComponent(short_name,
1011
                                    self._config_data.instances.keys(),
1012
                                    case_sensitive=False)
1013

    
1014
  def _UnlockedGetInstanceInfo(self, instance_name):
1015
    """Returns information about an instance.
1016

1017
    This function is for internal use, when the config lock is already held.
1018

1019
    """
1020
    if instance_name not in self._config_data.instances:
1021
      return None
1022

    
1023
    return self._config_data.instances[instance_name]
1024

    
1025
  @locking.ssynchronized(_config_lock, shared=1)
1026
  def GetInstanceInfo(self, instance_name):
1027
    """Returns information about an instance.
1028

1029
    It takes the information from the configuration file. Other information of
1030
    an instance are taken from the live systems.
1031

1032
    @param instance_name: name of the instance, e.g.
1033
        I{instance1.example.com}
1034

1035
    @rtype: L{objects.Instance}
1036
    @return: the instance object
1037

1038
    """
1039
    return self._UnlockedGetInstanceInfo(instance_name)
1040

    
1041
  @locking.ssynchronized(_config_lock, shared=1)
1042
  def GetAllInstancesInfo(self):
1043
    """Get the configuration of all instances.
1044

1045
    @rtype: dict
1046
    @return: dict of (instance, instance_info), where instance_info is what
1047
              would GetInstanceInfo return for the node
1048

1049
    """
1050
    my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
1051
                    for instance in self._UnlockedGetInstanceList()])
1052
    return my_dict
1053

    
1054
  @locking.ssynchronized(_config_lock)
1055
  def AddNode(self, node, ec_id):
1056
    """Add a node to the configuration.
1057

1058
    @type node: L{objects.Node}
1059
    @param node: a Node instance
1060

1061
    """
1062
    logging.info("Adding node %s to configuration", node.name)
1063

    
1064
    self._EnsureUUID(node, ec_id)
1065

    
1066
    node.serial_no = 1
1067
    node.ctime = node.mtime = time.time()
1068
    self._UnlockedAddNodeToGroup(node.name, node.nodegroup)
1069
    self._config_data.nodes[node.name] = node
1070
    self._config_data.cluster.serial_no += 1
1071
    self._WriteConfig()
1072

    
1073
  @locking.ssynchronized(_config_lock)
1074
  def RemoveNode(self, node_name):
1075
    """Remove a node from the configuration.
1076

1077
    """
1078
    logging.info("Removing node %s from configuration", node_name)
1079

    
1080
    if node_name not in self._config_data.nodes:
1081
      raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1082

    
1083
    self._UnlockedRemoveNodeFromGroup(self._config_data.nodes[node_name])
1084
    del self._config_data.nodes[node_name]
1085
    self._config_data.cluster.serial_no += 1
1086
    self._WriteConfig()
1087

    
1088
  @locking.ssynchronized(_config_lock, shared=1)
1089
  def ExpandNodeName(self, short_name):
1090
    """Attempt to expand an incomplete instance name.
1091

1092
    """
1093
    return utils.MatchNameComponent(short_name,
1094
                                    self._config_data.nodes.keys(),
1095
                                    case_sensitive=False)
1096

    
1097
  def _UnlockedGetNodeInfo(self, node_name):
1098
    """Get the configuration of a node, as stored in the config.
1099

1100
    This function is for internal use, when the config lock is already
1101
    held.
1102

1103
    @param node_name: the node name, e.g. I{node1.example.com}
1104

1105
    @rtype: L{objects.Node}
1106
    @return: the node object
1107

1108
    """
1109
    if node_name not in self._config_data.nodes:
1110
      return None
1111

    
1112
    return self._config_data.nodes[node_name]
1113

    
1114
  @locking.ssynchronized(_config_lock, shared=1)
1115
  def GetNodeInfo(self, node_name):
1116
    """Get the configuration of a node, as stored in the config.
1117

1118
    This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1119

1120
    @param node_name: the node name, e.g. I{node1.example.com}
1121

1122
    @rtype: L{objects.Node}
1123
    @return: the node object
1124

1125
    """
1126
    return self._UnlockedGetNodeInfo(node_name)
1127

    
1128
  def _UnlockedGetNodeList(self):
1129
    """Return the list of nodes which are in the configuration.
1130

1131
    This function is for internal use, when the config lock is already
1132
    held.
1133

1134
    @rtype: list
1135

1136
    """
1137
    return self._config_data.nodes.keys()
1138

    
1139
  @locking.ssynchronized(_config_lock, shared=1)
1140
  def GetNodeList(self):
1141
    """Return the list of nodes which are in the configuration.
1142

1143
    """
1144
    return self._UnlockedGetNodeList()
1145

    
1146
  def _UnlockedGetOnlineNodeList(self):
1147
    """Return the list of nodes which are online.
1148

1149
    """
1150
    all_nodes = [self._UnlockedGetNodeInfo(node)
1151
                 for node in self._UnlockedGetNodeList()]
1152
    return [node.name for node in all_nodes if not node.offline]
1153

    
1154
  @locking.ssynchronized(_config_lock, shared=1)
1155
  def GetOnlineNodeList(self):
1156
    """Return the list of nodes which are online.
1157

1158
    """
1159
    return self._UnlockedGetOnlineNodeList()
1160

    
1161
  @locking.ssynchronized(_config_lock, shared=1)
1162
  def GetAllNodesInfo(self):
1163
    """Get the configuration of all nodes.
1164

1165
    @rtype: dict
1166
    @return: dict of (node, node_info), where node_info is what
1167
              would GetNodeInfo return for the node
1168

1169
    """
1170
    my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
1171
                    for node in self._UnlockedGetNodeList()])
1172
    return my_dict
1173

    
1174
  def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1175
    """Get the number of current and maximum desired and possible candidates.
1176

1177
    @type exceptions: list
1178
    @param exceptions: if passed, list of nodes that should be ignored
1179
    @rtype: tuple
1180
    @return: tuple of (current, desired and possible, possible)
1181

1182
    """
1183
    mc_now = mc_should = mc_max = 0
1184
    for node in self._config_data.nodes.values():
1185
      if exceptions and node.name in exceptions:
1186
        continue
1187
      if not (node.offline or node.drained):
1188
        mc_max += 1
1189
      if node.master_candidate:
1190
        mc_now += 1
1191
    mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1192
    return (mc_now, mc_should, mc_max)
1193

    
1194
  @locking.ssynchronized(_config_lock, shared=1)
1195
  def GetMasterCandidateStats(self, exceptions=None):
1196
    """Get the number of current and maximum possible candidates.
1197

1198
    This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1199

1200
    @type exceptions: list
1201
    @param exceptions: if passed, list of nodes that should be ignored
1202
    @rtype: tuple
1203
    @return: tuple of (current, max)
1204

1205
    """
1206
    return self._UnlockedGetMasterCandidateStats(exceptions)
1207

    
1208
  @locking.ssynchronized(_config_lock)
1209
  def MaintainCandidatePool(self, exceptions):
1210
    """Try to grow the candidate pool to the desired size.
1211

1212
    @type exceptions: list
1213
    @param exceptions: if passed, list of nodes that should be ignored
1214
    @rtype: list
1215
    @return: list with the adjusted nodes (L{objects.Node} instances)
1216

1217
    """
1218
    mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1219
    mod_list = []
1220
    if mc_now < mc_max:
1221
      node_list = self._config_data.nodes.keys()
1222
      random.shuffle(node_list)
1223
      for name in node_list:
1224
        if mc_now >= mc_max:
1225
          break
1226
        node = self._config_data.nodes[name]
1227
        if (node.master_candidate or node.offline or node.drained or
1228
            node.name in exceptions):
1229
          continue
1230
        mod_list.append(node)
1231
        node.master_candidate = True
1232
        node.serial_no += 1
1233
        mc_now += 1
1234
      if mc_now != mc_max:
1235
        # this should not happen
1236
        logging.warning("Warning: MaintainCandidatePool didn't manage to"
1237
                        " fill the candidate pool (%d/%d)", mc_now, mc_max)
1238
      if mod_list:
1239
        self._config_data.cluster.serial_no += 1
1240
        self._WriteConfig()
1241

    
1242
    return mod_list
1243

    
1244
  def _UnlockedAddNodeToGroup(self, node_name, nodegroup_uuid):
1245
    """Add a given node to the specified group.
1246

1247
    """
1248
    if nodegroup_uuid not in self._config_data.nodegroups:
1249
      # This can happen if a node group gets deleted between its lookup and
1250
      # when we're adding the first node to it, since we don't keep a lock in
1251
      # the meantime. It's ok though, as we'll fail cleanly if the node group
1252
      # is not found anymore.
1253
      raise errors.OpExecError("Unknown nodegroup: %s" % nodegroup_uuid)
1254
    if node_name not in self._config_data.nodegroups[nodegroup_uuid].members:
1255
      self._config_data.nodegroups[nodegroup_uuid].members.append(node_name)
1256

    
1257
  def _UnlockedRemoveNodeFromGroup(self, node):
1258
    """Remove a given node from its group.
1259

1260
    """
1261
    nodegroup = node.nodegroup
1262
    if nodegroup not in self._config_data.nodegroups:
1263
      logging.warning("Warning: node '%s' has a non-existing nodegroup '%s'"
1264
                      " (while being removed from it)", node.name, nodegroup)
1265
    nodegroup_obj = self._config_data.nodegroups[nodegroup]
1266
    if node.name not in nodegroup_obj.members:
1267
      logging.warning("Warning: node '%s' not a member of its nodegroup '%s'"
1268
                      " (while being removed from it)", node.name, nodegroup)
1269
    else:
1270
      nodegroup_obj.members.remove(node.name)
1271

    
1272
  def _BumpSerialNo(self):
1273
    """Bump up the serial number of the config.
1274

1275
    """
1276
    self._config_data.serial_no += 1
1277
    self._config_data.mtime = time.time()
1278

    
1279
  def _AllUUIDObjects(self):
1280
    """Returns all objects with uuid attributes.
1281

1282
    """
1283
    return (self._config_data.instances.values() +
1284
            self._config_data.nodes.values() +
1285
            self._config_data.nodegroups.values() +
1286
            [self._config_data.cluster])
1287

    
1288
  def _OpenConfig(self):
1289
    """Read the config data from disk.
1290

1291
    """
1292
    raw_data = utils.ReadFile(self._cfg_file)
1293

    
1294
    try:
1295
      data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1296
    except Exception, err:
1297
      raise errors.ConfigurationError(err)
1298

    
1299
    # Make sure the configuration has the right version
1300
    _ValidateConfig(data)
1301

    
1302
    if (not hasattr(data, 'cluster') or
1303
        not hasattr(data.cluster, 'rsahostkeypub')):
1304
      raise errors.ConfigurationError("Incomplete configuration"
1305
                                      " (missing cluster.rsahostkeypub)")
1306

    
1307
    # Upgrade configuration if needed
1308
    data.UpgradeConfig()
1309

    
1310
    self._config_data = data
1311
    # reset the last serial as -1 so that the next write will cause
1312
    # ssconf update
1313
    self._last_cluster_serial = -1
1314

    
1315
    # And finally run our (custom) config upgrade sequence
1316
    self._UpgradeConfig()
1317

    
1318
  def _UpgradeConfig(self):
1319
    """Run upgrade steps that cannot be done purely in the objects.
1320

1321
    This is because some data elements need uniqueness across the
1322
    whole configuration, etc.
1323

1324
    @warning: this function will call L{_WriteConfig()}, but also
1325
        L{DropECReservations} so it needs to be called only from a
1326
        "safe" place (the constructor). If one wanted to call it with
1327
        the lock held, a DropECReservationUnlocked would need to be
1328
        created first, to avoid causing deadlock.
1329

1330
    """
1331
    modified = False
1332
    for item in self._AllUUIDObjects():
1333
      if item.uuid is None:
1334
        item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1335
        modified = True
1336
    if not self._config_data.nodegroups:
1337
      default_nodegroup_uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1338
      default_nodegroup = objects.NodeGroup(
1339
          uuid=default_nodegroup_uuid,
1340
          name="default",
1341
          members=[],
1342
          )
1343
      self._config_data.nodegroups[default_nodegroup_uuid] = default_nodegroup
1344
      modified = True
1345
    for node in self._config_data.nodes.values():
1346
      if not node.nodegroup:
1347
        node.nodegroup = self.LookupNodeGroup(None)
1348
        modified = True
1349
      # This is technically *not* an upgrade, but needs to be done both when
1350
      # nodegroups are being added, and upon normally loading the config,
1351
      # because the members list of a node group is discarded upon
1352
      # serializing/deserializing the object.
1353
      self._UnlockedAddNodeToGroup(node.name, node.nodegroup)
1354
    if modified:
1355
      self._WriteConfig()
1356
      # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1357
      # only called at config init time, without the lock held
1358
      self.DropECReservations(_UPGRADE_CONFIG_JID)
1359

    
1360
  def _DistributeConfig(self, feedback_fn):
1361
    """Distribute the configuration to the other nodes.
1362

1363
    Currently, this only copies the configuration file. In the future,
1364
    it could be used to encapsulate the 2/3-phase update mechanism.
1365

1366
    """
1367
    if self._offline:
1368
      return True
1369

    
1370
    bad = False
1371

    
1372
    node_list = []
1373
    addr_list = []
1374
    myhostname = self._my_hostname
1375
    # we can skip checking whether _UnlockedGetNodeInfo returns None
1376
    # since the node list comes from _UnlocketGetNodeList, and we are
1377
    # called with the lock held, so no modifications should take place
1378
    # in between
1379
    for node_name in self._UnlockedGetNodeList():
1380
      if node_name == myhostname:
1381
        continue
1382
      node_info = self._UnlockedGetNodeInfo(node_name)
1383
      if not node_info.master_candidate:
1384
        continue
1385
      node_list.append(node_info.name)
1386
      addr_list.append(node_info.primary_ip)
1387

    
1388
    result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1389
                                            address_list=addr_list)
1390
    for to_node, to_result in result.items():
1391
      msg = to_result.fail_msg
1392
      if msg:
1393
        msg = ("Copy of file %s to node %s failed: %s" %
1394
               (self._cfg_file, to_node, msg))
1395
        logging.error(msg)
1396

    
1397
        if feedback_fn:
1398
          feedback_fn(msg)
1399

    
1400
        bad = True
1401

    
1402
    return not bad
1403

    
1404
  def _WriteConfig(self, destination=None, feedback_fn=None):
1405
    """Write the configuration data to persistent storage.
1406

1407
    """
1408
    assert feedback_fn is None or callable(feedback_fn)
1409

    
1410
    # Warn on config errors, but don't abort the save - the
1411
    # configuration has already been modified, and we can't revert;
1412
    # the best we can do is to warn the user and save as is, leaving
1413
    # recovery to the user
1414
    config_errors = self._UnlockedVerifyConfig()
1415
    if config_errors:
1416
      errmsg = ("Configuration data is not consistent: %s" %
1417
                (utils.CommaJoin(config_errors)))
1418
      logging.critical(errmsg)
1419
      if feedback_fn:
1420
        feedback_fn(errmsg)
1421

    
1422
    if destination is None:
1423
      destination = self._cfg_file
1424
    self._BumpSerialNo()
1425
    txt = serializer.Dump(self._config_data.ToDict())
1426

    
1427
    getents = self._getents()
1428
    utils.WriteFile(destination, data=txt, gid=getents.confd_gid, mode=0640)
1429

    
1430
    self.write_count += 1
1431

    
1432
    # and redistribute the config file to master candidates
1433
    self._DistributeConfig(feedback_fn)
1434

    
1435
    # Write ssconf files on all nodes (including locally)
1436
    if self._last_cluster_serial < self._config_data.cluster.serial_no:
1437
      if not self._offline:
1438
        result = rpc.RpcRunner.call_write_ssconf_files(
1439
          self._UnlockedGetOnlineNodeList(),
1440
          self._UnlockedGetSsconfValues())
1441

    
1442
        for nname, nresu in result.items():
1443
          msg = nresu.fail_msg
1444
          if msg:
1445
            errmsg = ("Error while uploading ssconf files to"
1446
                      " node %s: %s" % (nname, msg))
1447
            logging.warning(errmsg)
1448

    
1449
            if feedback_fn:
1450
              feedback_fn(errmsg)
1451

    
1452
      self._last_cluster_serial = self._config_data.cluster.serial_no
1453

    
1454
  def _UnlockedGetSsconfValues(self):
1455
    """Return the values needed by ssconf.
1456

1457
    @rtype: dict
1458
    @return: a dictionary with keys the ssconf names and values their
1459
        associated value
1460

1461
    """
1462
    fn = "\n".join
1463
    instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1464
    node_names = utils.NiceSort(self._UnlockedGetNodeList())
1465
    node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1466
    node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1467
                    for ninfo in node_info]
1468
    node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1469
                    for ninfo in node_info]
1470

    
1471
    instance_data = fn(instance_names)
1472
    off_data = fn(node.name for node in node_info if node.offline)
1473
    on_data = fn(node.name for node in node_info if not node.offline)
1474
    mc_data = fn(node.name for node in node_info if node.master_candidate)
1475
    mc_ips_data = fn(node.primary_ip for node in node_info
1476
                     if node.master_candidate)
1477
    node_data = fn(node_names)
1478
    node_pri_ips_data = fn(node_pri_ips)
1479
    node_snd_ips_data = fn(node_snd_ips)
1480

    
1481
    cluster = self._config_data.cluster
1482
    cluster_tags = fn(cluster.GetTags())
1483

    
1484
    hypervisor_list = fn(cluster.enabled_hypervisors)
1485

    
1486
    uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n")
1487

    
1488
    nodegroups = ["%s %s" % (nodegroup.uuid, nodegroup.name) for nodegroup in
1489
                  self._config_data.nodegroups.values()]
1490
    nodegroups_data = fn(utils.NiceSort(nodegroups))
1491

    
1492
    return {
1493
      constants.SS_CLUSTER_NAME: cluster.cluster_name,
1494
      constants.SS_CLUSTER_TAGS: cluster_tags,
1495
      constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1496
      constants.SS_MASTER_CANDIDATES: mc_data,
1497
      constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1498
      constants.SS_MASTER_IP: cluster.master_ip,
1499
      constants.SS_MASTER_NETDEV: cluster.master_netdev,
1500
      constants.SS_MASTER_NODE: cluster.master_node,
1501
      constants.SS_NODE_LIST: node_data,
1502
      constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1503
      constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1504
      constants.SS_OFFLINE_NODES: off_data,
1505
      constants.SS_ONLINE_NODES: on_data,
1506
      constants.SS_PRIMARY_IP_FAMILY: str(cluster.primary_ip_family),
1507
      constants.SS_INSTANCE_LIST: instance_data,
1508
      constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1509
      constants.SS_HYPERVISOR_LIST: hypervisor_list,
1510
      constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
1511
      constants.SS_UID_POOL: uid_pool,
1512
      constants.SS_NODEGROUPS: nodegroups_data,
1513
      }
1514

    
1515
  @locking.ssynchronized(_config_lock, shared=1)
1516
  def GetSsconfValues(self):
1517
    """Wrapper using lock around _UnlockedGetSsconf().
1518

1519
    """
1520
    return self._UnlockedGetSsconfValues()
1521

    
1522
  @locking.ssynchronized(_config_lock, shared=1)
1523
  def GetVGName(self):
1524
    """Return the volume group name.
1525

1526
    """
1527
    return self._config_data.cluster.volume_group_name
1528

    
1529
  @locking.ssynchronized(_config_lock)
1530
  def SetVGName(self, vg_name):
1531
    """Set the volume group name.
1532

1533
    """
1534
    self._config_data.cluster.volume_group_name = vg_name
1535
    self._config_data.cluster.serial_no += 1
1536
    self._WriteConfig()
1537

    
1538
  @locking.ssynchronized(_config_lock, shared=1)
1539
  def GetDRBDHelper(self):
1540
    """Return DRBD usermode helper.
1541

1542
    """
1543
    return self._config_data.cluster.drbd_usermode_helper
1544

    
1545
  @locking.ssynchronized(_config_lock)
1546
  def SetDRBDHelper(self, drbd_helper):
1547
    """Set DRBD usermode helper.
1548

1549
    """
1550
    self._config_data.cluster.drbd_usermode_helper = drbd_helper
1551
    self._config_data.cluster.serial_no += 1
1552
    self._WriteConfig()
1553

    
1554
  @locking.ssynchronized(_config_lock, shared=1)
1555
  def GetMACPrefix(self):
1556
    """Return the mac prefix.
1557

1558
    """
1559
    return self._config_data.cluster.mac_prefix
1560

    
1561
  @locking.ssynchronized(_config_lock, shared=1)
1562
  def GetClusterInfo(self):
1563
    """Returns information about the cluster
1564

1565
    @rtype: L{objects.Cluster}
1566
    @return: the cluster object
1567

1568
    """
1569
    return self._config_data.cluster
1570

    
1571
  @locking.ssynchronized(_config_lock, shared=1)
1572
  def HasAnyDiskOfType(self, dev_type):
1573
    """Check if in there is at disk of the given type in the configuration.
1574

1575
    """
1576
    return self._config_data.HasAnyDiskOfType(dev_type)
1577

    
1578
  @locking.ssynchronized(_config_lock)
1579
  def Update(self, target, feedback_fn):
1580
    """Notify function to be called after updates.
1581

1582
    This function must be called when an object (as returned by
1583
    GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1584
    caller wants the modifications saved to the backing store. Note
1585
    that all modified objects will be saved, but the target argument
1586
    is the one the caller wants to ensure that it's saved.
1587

1588
    @param target: an instance of either L{objects.Cluster},
1589
        L{objects.Node} or L{objects.Instance} which is existing in
1590
        the cluster
1591
    @param feedback_fn: Callable feedback function
1592

1593
    """
1594
    if self._config_data is None:
1595
      raise errors.ProgrammerError("Configuration file not read,"
1596
                                   " cannot save.")
1597
    update_serial = False
1598
    if isinstance(target, objects.Cluster):
1599
      test = target == self._config_data.cluster
1600
    elif isinstance(target, objects.Node):
1601
      test = target in self._config_data.nodes.values()
1602
      update_serial = True
1603
    elif isinstance(target, objects.Instance):
1604
      test = target in self._config_data.instances.values()
1605
    else:
1606
      raise errors.ProgrammerError("Invalid object type (%s) passed to"
1607
                                   " ConfigWriter.Update" % type(target))
1608
    if not test:
1609
      raise errors.ConfigurationError("Configuration updated since object"
1610
                                      " has been read or unknown object")
1611
    target.serial_no += 1
1612
    target.mtime = now = time.time()
1613

    
1614
    if update_serial:
1615
      # for node updates, we need to increase the cluster serial too
1616
      self._config_data.cluster.serial_no += 1
1617
      self._config_data.cluster.mtime = now
1618

    
1619
    if isinstance(target, objects.Instance):
1620
      self._UnlockedReleaseDRBDMinors(target.name)
1621

    
1622
    self._WriteConfig(feedback_fn=feedback_fn)
1623

    
1624
  @locking.ssynchronized(_config_lock)
1625
  def DropECReservations(self, ec_id):
1626
    """Drop per-execution-context reservations
1627

1628
    """
1629
    for rm in self._all_rms:
1630
      rm.DropECReservations(ec_id)