Statistics
| Branch: | Tag: | Revision:

root / lib / config.py @ 26b316d0

History | View | Annotate | Download (40.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Configuration management for Ganeti
23

24
This module provides the interface to the Ganeti cluster configuration.
25

26
The configuration data is stored on every node but is updated on the master
27
only. After each update, the master distributes the data to the other nodes.
28

29
Currently, the data storage format is JSON. YAML was slow and consuming too
30
much memory.
31

32
"""
33

    
34
import os
35
import tempfile
36
import random
37
import logging
38

    
39
from ganeti import errors
40
from ganeti import locking
41
from ganeti import utils
42
from ganeti import constants
43
from ganeti import rpc
44
from ganeti import objects
45
from ganeti import serializer
46

    
47

    
48
_config_lock = locking.SharedLock()
49

    
50

    
51
def _ValidateConfig(data):
52
  """Verifies that a configuration objects looks valid.
53

54
  This only verifies the version of the configuration.
55

56
  @raise errors.ConfigurationError: if the version differs from what
57
      we expect
58

59
  """
60
  if data.version != constants.CONFIG_VERSION:
61
    raise errors.ConfigurationError("Cluster configuration version"
62
                                    " mismatch, got %s instead of %s" %
63
                                    (data.version,
64
                                     constants.CONFIG_VERSION))
65

    
66

    
67
class ConfigWriter:
68
  """The interface to the cluster configuration.
69

70
  """
71
  def __init__(self, cfg_file=None, offline=False):
72
    self.write_count = 0
73
    self._lock = _config_lock
74
    self._config_data = None
75
    self._offline = offline
76
    if cfg_file is None:
77
      self._cfg_file = constants.CLUSTER_CONF_FILE
78
    else:
79
      self._cfg_file = cfg_file
80
    self._temporary_ids = set()
81
    self._temporary_drbds = {}
82
    self._temporary_macs = set()
83
    # Note: in order to prevent errors when resolving our name in
84
    # _DistributeConfig, we compute it here once and reuse it; it's
85
    # better to raise an error before starting to modify the config
86
    # file than after it was modified
87
    self._my_hostname = utils.HostInfo().name
88
    self._last_cluster_serial = -1
89
    self._OpenConfig()
90

    
91
  # this method needs to be static, so that we can call it on the class
92
  @staticmethod
93
  def IsCluster():
94
    """Check if the cluster is configured.
95

96
    """
97
    return os.path.exists(constants.CLUSTER_CONF_FILE)
98

    
99
  @locking.ssynchronized(_config_lock, shared=1)
100
  def GenerateMAC(self):
101
    """Generate a MAC for an instance.
102

103
    This should check the current instances for duplicates.
104

105
    """
106
    prefix = self._config_data.cluster.mac_prefix
107
    all_macs = self._AllMACs()
108
    retries = 64
109
    while retries > 0:
110
      byte1 = random.randrange(0, 256)
111
      byte2 = random.randrange(0, 256)
112
      byte3 = random.randrange(0, 256)
113
      mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
114
      if mac not in all_macs and mac not in self._temporary_macs:
115
        break
116
      retries -= 1
117
    else:
118
      raise errors.ConfigurationError("Can't generate unique MAC")
119
    self._temporary_macs.add(mac)
120
    return mac
121

    
122
  @locking.ssynchronized(_config_lock, shared=1)
123
  def IsMacInUse(self, mac):
124
    """Predicate: check if the specified MAC is in use in the Ganeti cluster.
125

126
    This only checks instances managed by this cluster, it does not
127
    check for potential collisions elsewhere.
128

129
    """
130
    all_macs = self._AllMACs()
131
    return mac in all_macs or mac in self._temporary_macs
132

    
133
  @locking.ssynchronized(_config_lock, shared=1)
134
  def GenerateDRBDSecret(self):
135
    """Generate a DRBD secret.
136

137
    This checks the current disks for duplicates.
138

139
    """
140
    all_secrets = self._AllDRBDSecrets()
141
    retries = 64
142
    while retries > 0:
143
      secret = utils.GenerateSecret()
144
      if secret not in all_secrets:
145
        break
146
      retries -= 1
147
    else:
148
      raise errors.ConfigurationError("Can't generate unique DRBD secret")
149
    return secret
150

    
151
  def _AllLVs(self):
152
    """Compute the list of all LVs.
153

154
    """
155
    lvnames = set()
156
    for instance in self._config_data.instances.values():
157
      node_data = instance.MapLVsByNode()
158
      for lv_list in node_data.values():
159
        lvnames.update(lv_list)
160
    return lvnames
161

    
162
  def _AllIDs(self, include_temporary):
163
    """Compute the list of all UUIDs and names we have.
164

165
    @type include_temporary: boolean
166
    @param include_temporary: whether to include the _temporary_ids set
167
    @rtype: set
168
    @return: a set of IDs
169

170
    """
171
    existing = set()
172
    if include_temporary:
173
      existing.update(self._temporary_ids)
174
    existing.update(self._AllLVs())
175
    existing.update(self._config_data.instances.keys())
176
    existing.update(self._config_data.nodes.keys())
177
    return existing
178

    
179
  @locking.ssynchronized(_config_lock, shared=1)
180
  def GenerateUniqueID(self, exceptions=None):
181
    """Generate an unique disk name.
182

183
    This checks the current node, instances and disk names for
184
    duplicates.
185

186
    @param exceptions: a list with some other names which should be checked
187
        for uniqueness (used for example when you want to get
188
        more than one id at one time without adding each one in
189
        turn to the config file)
190

191
    @rtype: string
192
    @return: the unique id
193

194
    """
195
    existing = self._AllIDs(include_temporary=True)
196
    if exceptions is not None:
197
      existing.update(exceptions)
198
    retries = 64
199
    while retries > 0:
200
      unique_id = utils.NewUUID()
201
      if unique_id not in existing and unique_id is not None:
202
        break
203
    else:
204
      raise errors.ConfigurationError("Not able generate an unique ID"
205
                                      " (last tried ID: %s" % unique_id)
206
    self._temporary_ids.add(unique_id)
207
    return unique_id
208

    
209
  def _AllMACs(self):
210
    """Return all MACs present in the config.
211

212
    @rtype: list
213
    @return: the list of all MACs
214

215
    """
216
    result = []
217
    for instance in self._config_data.instances.values():
218
      for nic in instance.nics:
219
        result.append(nic.mac)
220

    
221
    return result
222

    
223
  def _AllDRBDSecrets(self):
224
    """Return all DRBD secrets present in the config.
225

226
    @rtype: list
227
    @return: the list of all DRBD secrets
228

229
    """
230
    def helper(disk, result):
231
      """Recursively gather secrets from this disk."""
232
      if disk.dev_type == constants.DT_DRBD8:
233
        result.append(disk.logical_id[5])
234
      if disk.children:
235
        for child in disk.children:
236
          helper(child, result)
237

    
238
    result = []
239
    for instance in self._config_data.instances.values():
240
      for disk in instance.disks:
241
        helper(disk, result)
242

    
243
    return result
244

    
245
  def _CheckDiskIDs(self, disk, l_ids, p_ids):
246
    """Compute duplicate disk IDs
247

248
    @type disk: L{objects.Disk}
249
    @param disk: the disk at which to start searching
250
    @type l_ids: list
251
    @param l_ids: list of current logical ids
252
    @type p_ids: list
253
    @param p_ids: list of current physical ids
254
    @rtype: list
255
    @return: a list of error messages
256

257
    """
258
    result = []
259
    if disk.logical_id is not None:
260
      if disk.logical_id in l_ids:
261
        result.append("duplicate logical id %s" % str(disk.logical_id))
262
      else:
263
        l_ids.append(disk.logical_id)
264
    if disk.physical_id is not None:
265
      if disk.physical_id in p_ids:
266
        result.append("duplicate physical id %s" % str(disk.physical_id))
267
      else:
268
        p_ids.append(disk.physical_id)
269

    
270
    if disk.children:
271
      for child in disk.children:
272
        result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
273
    return result
274

    
275
  def _UnlockedVerifyConfig(self):
276
    """Verify function.
277

278
    @rtype: list
279
    @return: a list of error messages; a non-empty list signifies
280
        configuration errors
281

282
    """
283
    result = []
284
    seen_macs = []
285
    ports = {}
286
    data = self._config_data
287
    seen_lids = []
288
    seen_pids = []
289

    
290
    # global cluster checks
291
    if not data.cluster.enabled_hypervisors:
292
      result.append("enabled hypervisors list doesn't have any entries")
293
    invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
294
    if invalid_hvs:
295
      result.append("enabled hypervisors contains invalid entries: %s" %
296
                    invalid_hvs)
297

    
298
    if data.cluster.master_node not in data.nodes:
299
      result.append("cluster has invalid primary node '%s'" %
300
                    data.cluster.master_node)
301

    
302
    # per-instance checks
303
    for instance_name in data.instances:
304
      instance = data.instances[instance_name]
305
      if instance.primary_node not in data.nodes:
306
        result.append("instance '%s' has invalid primary node '%s'" %
307
                      (instance_name, instance.primary_node))
308
      for snode in instance.secondary_nodes:
309
        if snode not in data.nodes:
310
          result.append("instance '%s' has invalid secondary node '%s'" %
311
                        (instance_name, snode))
312
      for idx, nic in enumerate(instance.nics):
313
        if nic.mac in seen_macs:
314
          result.append("instance '%s' has NIC %d mac %s duplicate" %
315
                        (instance_name, idx, nic.mac))
316
        else:
317
          seen_macs.append(nic.mac)
318

    
319
      # gather the drbd ports for duplicate checks
320
      for dsk in instance.disks:
321
        if dsk.dev_type in constants.LDS_DRBD:
322
          tcp_port = dsk.logical_id[2]
323
          if tcp_port not in ports:
324
            ports[tcp_port] = []
325
          ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
326
      # gather network port reservation
327
      net_port = getattr(instance, "network_port", None)
328
      if net_port is not None:
329
        if net_port not in ports:
330
          ports[net_port] = []
331
        ports[net_port].append((instance.name, "network port"))
332

    
333
      # instance disk verify
334
      for idx, disk in enumerate(instance.disks):
335
        result.extend(["instance '%s' disk %d error: %s" %
336
                       (instance.name, idx, msg) for msg in disk.Verify()])
337
        result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
338

    
339
    # cluster-wide pool of free ports
340
    for free_port in data.cluster.tcpudp_port_pool:
341
      if free_port not in ports:
342
        ports[free_port] = []
343
      ports[free_port].append(("cluster", "port marked as free"))
344

    
345
    # compute tcp/udp duplicate ports
346
    keys = ports.keys()
347
    keys.sort()
348
    for pnum in keys:
349
      pdata = ports[pnum]
350
      if len(pdata) > 1:
351
        txt = ", ".join(["%s/%s" % val for val in pdata])
352
        result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
353

    
354
    # highest used tcp port check
355
    if keys:
356
      if keys[-1] > data.cluster.highest_used_port:
357
        result.append("Highest used port mismatch, saved %s, computed %s" %
358
                      (data.cluster.highest_used_port, keys[-1]))
359

    
360
    if not data.nodes[data.cluster.master_node].master_candidate:
361
      result.append("Master node is not a master candidate")
362

    
363
    # master candidate checks
364
    mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
365
    if mc_now < mc_max:
366
      result.append("Not enough master candidates: actual %d, target %d" %
367
                    (mc_now, mc_max))
368

    
369
    # node checks
370
    for node in data.nodes.values():
371
      if [node.master_candidate, node.drained, node.offline].count(True) > 1:
372
        result.append("Node %s state is invalid: master_candidate=%s,"
373
                      " drain=%s, offline=%s" %
374
                      (node.name, node.master_candidate, node.drain,
375
                       node.offline))
376

    
377
    # drbd minors check
378
    d_map, duplicates = self._UnlockedComputeDRBDMap()
379
    for node, minor, instance_a, instance_b in duplicates:
380
      result.append("DRBD minor %d on node %s is assigned twice to instances"
381
                    " %s and %s" % (minor, node, instance_a, instance_b))
382

    
383
    return result
384

    
385
  @locking.ssynchronized(_config_lock, shared=1)
386
  def VerifyConfig(self):
387
    """Verify function.
388

389
    This is just a wrapper over L{_UnlockedVerifyConfig}.
390

391
    @rtype: list
392
    @return: a list of error messages; a non-empty list signifies
393
        configuration errors
394

395
    """
396
    return self._UnlockedVerifyConfig()
397

    
398
  def _UnlockedSetDiskID(self, disk, node_name):
399
    """Convert the unique ID to the ID needed on the target nodes.
400

401
    This is used only for drbd, which needs ip/port configuration.
402

403
    The routine descends down and updates its children also, because
404
    this helps when the only the top device is passed to the remote
405
    node.
406

407
    This function is for internal use, when the config lock is already held.
408

409
    """
410
    if disk.children:
411
      for child in disk.children:
412
        self._UnlockedSetDiskID(child, node_name)
413

    
414
    if disk.logical_id is None and disk.physical_id is not None:
415
      return
416
    if disk.dev_type == constants.LD_DRBD8:
417
      pnode, snode, port, pminor, sminor, secret = disk.logical_id
418
      if node_name not in (pnode, snode):
419
        raise errors.ConfigurationError("DRBD device not knowing node %s" %
420
                                        node_name)
421
      pnode_info = self._UnlockedGetNodeInfo(pnode)
422
      snode_info = self._UnlockedGetNodeInfo(snode)
423
      if pnode_info is None or snode_info is None:
424
        raise errors.ConfigurationError("Can't find primary or secondary node"
425
                                        " for %s" % str(disk))
426
      p_data = (pnode_info.secondary_ip, port)
427
      s_data = (snode_info.secondary_ip, port)
428
      if pnode == node_name:
429
        disk.physical_id = p_data + s_data + (pminor, secret)
430
      else: # it must be secondary, we tested above
431
        disk.physical_id = s_data + p_data + (sminor, secret)
432
    else:
433
      disk.physical_id = disk.logical_id
434
    return
435

    
436
  @locking.ssynchronized(_config_lock)
437
  def SetDiskID(self, disk, node_name):
438
    """Convert the unique ID to the ID needed on the target nodes.
439

440
    This is used only for drbd, which needs ip/port configuration.
441

442
    The routine descends down and updates its children also, because
443
    this helps when the only the top device is passed to the remote
444
    node.
445

446
    """
447
    return self._UnlockedSetDiskID(disk, node_name)
448

    
449
  @locking.ssynchronized(_config_lock)
450
  def AddTcpUdpPort(self, port):
451
    """Adds a new port to the available port pool.
452

453
    """
454
    if not isinstance(port, int):
455
      raise errors.ProgrammerError("Invalid type passed for port")
456

    
457
    self._config_data.cluster.tcpudp_port_pool.add(port)
458
    self._WriteConfig()
459

    
460
  @locking.ssynchronized(_config_lock, shared=1)
461
  def GetPortList(self):
462
    """Returns a copy of the current port list.
463

464
    """
465
    return self._config_data.cluster.tcpudp_port_pool.copy()
466

    
467
  @locking.ssynchronized(_config_lock)
468
  def AllocatePort(self):
469
    """Allocate a port.
470

471
    The port will be taken from the available port pool or from the
472
    default port range (and in this case we increase
473
    highest_used_port).
474

475
    """
476
    # If there are TCP/IP ports configured, we use them first.
477
    if self._config_data.cluster.tcpudp_port_pool:
478
      port = self._config_data.cluster.tcpudp_port_pool.pop()
479
    else:
480
      port = self._config_data.cluster.highest_used_port + 1
481
      if port >= constants.LAST_DRBD_PORT:
482
        raise errors.ConfigurationError("The highest used port is greater"
483
                                        " than %s. Aborting." %
484
                                        constants.LAST_DRBD_PORT)
485
      self._config_data.cluster.highest_used_port = port
486

    
487
    self._WriteConfig()
488
    return port
489

    
490
  def _UnlockedComputeDRBDMap(self):
491
    """Compute the used DRBD minor/nodes.
492

493
    @rtype: (dict, list)
494
    @return: dictionary of node_name: dict of minor: instance_name;
495
        the returned dict will have all the nodes in it (even if with
496
        an empty list), and a list of duplicates; if the duplicates
497
        list is not empty, the configuration is corrupted and its caller
498
        should raise an exception
499

500
    """
501
    def _AppendUsedPorts(instance_name, disk, used):
502
      duplicates = []
503
      if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
504
        node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
505
        for node, port in ((node_a, minor_a), (node_b, minor_b)):
506
          assert node in used, ("Node '%s' of instance '%s' not found"
507
                                " in node list" % (node, instance_name))
508
          if port in used[node]:
509
            duplicates.append((node, port, instance_name, used[node][port]))
510
          else:
511
            used[node][port] = instance_name
512
      if disk.children:
513
        for child in disk.children:
514
          duplicates.extend(_AppendUsedPorts(instance_name, child, used))
515
      return duplicates
516

    
517
    duplicates = []
518
    my_dict = dict((node, {}) for node in self._config_data.nodes)
519
    for instance in self._config_data.instances.itervalues():
520
      for disk in instance.disks:
521
        duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
522
    for (node, minor), instance in self._temporary_drbds.iteritems():
523
      if minor in my_dict[node] and my_dict[node][minor] != instance:
524
        duplicates.append((node, minor, instance, my_dict[node][minor]))
525
      else:
526
        my_dict[node][minor] = instance
527
    return my_dict, duplicates
528

    
529
  @locking.ssynchronized(_config_lock)
530
  def ComputeDRBDMap(self):
531
    """Compute the used DRBD minor/nodes.
532

533
    This is just a wrapper over L{_UnlockedComputeDRBDMap}.
534

535
    @return: dictionary of node_name: dict of minor: instance_name;
536
        the returned dict will have all the nodes in it (even if with
537
        an empty list).
538

539
    """
540
    d_map, duplicates = self._UnlockedComputeDRBDMap()
541
    if duplicates:
542
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
543
                                      str(duplicates))
544
    return d_map
545

    
546
  @locking.ssynchronized(_config_lock)
547
  def AllocateDRBDMinor(self, nodes, instance):
548
    """Allocate a drbd minor.
549

550
    The free minor will be automatically computed from the existing
551
    devices. A node can be given multiple times in order to allocate
552
    multiple minors. The result is the list of minors, in the same
553
    order as the passed nodes.
554

555
    @type instance: string
556
    @param instance: the instance for which we allocate minors
557

558
    """
559
    assert isinstance(instance, basestring), \
560
           "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
561

    
562
    d_map, duplicates = self._UnlockedComputeDRBDMap()
563
    if duplicates:
564
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
565
                                      str(duplicates))
566
    result = []
567
    for nname in nodes:
568
      ndata = d_map[nname]
569
      if not ndata:
570
        # no minors used, we can start at 0
571
        result.append(0)
572
        ndata[0] = instance
573
        self._temporary_drbds[(nname, 0)] = instance
574
        continue
575
      keys = ndata.keys()
576
      keys.sort()
577
      ffree = utils.FirstFree(keys)
578
      if ffree is None:
579
        # return the next minor
580
        # TODO: implement high-limit check
581
        minor = keys[-1] + 1
582
      else:
583
        minor = ffree
584
      # double-check minor against current instances
585
      assert minor not in d_map[nname], \
586
             ("Attempt to reuse allocated DRBD minor %d on node %s,"
587
              " already allocated to instance %s" %
588
              (minor, nname, d_map[nname][minor]))
589
      ndata[minor] = instance
590
      # double-check minor against reservation
591
      r_key = (nname, minor)
592
      assert r_key not in self._temporary_drbds, \
593
             ("Attempt to reuse reserved DRBD minor %d on node %s,"
594
              " reserved for instance %s" %
595
              (minor, nname, self._temporary_drbds[r_key]))
596
      self._temporary_drbds[r_key] = instance
597
      result.append(minor)
598
    logging.debug("Request to allocate drbd minors, input: %s, returning %s",
599
                  nodes, result)
600
    return result
601

    
602
  def _UnlockedReleaseDRBDMinors(self, instance):
603
    """Release temporary drbd minors allocated for a given instance.
604

605
    @type instance: string
606
    @param instance: the instance for which temporary minors should be
607
                     released
608

609
    """
610
    assert isinstance(instance, basestring), \
611
           "Invalid argument passed to ReleaseDRBDMinors"
612
    for key, name in self._temporary_drbds.items():
613
      if name == instance:
614
        del self._temporary_drbds[key]
615

    
616
  @locking.ssynchronized(_config_lock)
617
  def ReleaseDRBDMinors(self, instance):
618
    """Release temporary drbd minors allocated for a given instance.
619

620
    This should be called on the error paths, on the success paths
621
    it's automatically called by the ConfigWriter add and update
622
    functions.
623

624
    This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
625

626
    @type instance: string
627
    @param instance: the instance for which temporary minors should be
628
                     released
629

630
    """
631
    self._UnlockedReleaseDRBDMinors(instance)
632

    
633
  @locking.ssynchronized(_config_lock, shared=1)
634
  def GetConfigVersion(self):
635
    """Get the configuration version.
636

637
    @return: Config version
638

639
    """
640
    return self._config_data.version
641

    
642
  @locking.ssynchronized(_config_lock, shared=1)
643
  def GetClusterName(self):
644
    """Get cluster name.
645

646
    @return: Cluster name
647

648
    """
649
    return self._config_data.cluster.cluster_name
650

    
651
  @locking.ssynchronized(_config_lock, shared=1)
652
  def GetMasterNode(self):
653
    """Get the hostname of the master node for this cluster.
654

655
    @return: Master hostname
656

657
    """
658
    return self._config_data.cluster.master_node
659

    
660
  @locking.ssynchronized(_config_lock, shared=1)
661
  def GetMasterIP(self):
662
    """Get the IP of the master node for this cluster.
663

664
    @return: Master IP
665

666
    """
667
    return self._config_data.cluster.master_ip
668

    
669
  @locking.ssynchronized(_config_lock, shared=1)
670
  def GetMasterNetdev(self):
671
    """Get the master network device for this cluster.
672

673
    """
674
    return self._config_data.cluster.master_netdev
675

    
676
  @locking.ssynchronized(_config_lock, shared=1)
677
  def GetFileStorageDir(self):
678
    """Get the file storage dir for this cluster.
679

680
    """
681
    return self._config_data.cluster.file_storage_dir
682

    
683
  @locking.ssynchronized(_config_lock, shared=1)
684
  def GetHypervisorType(self):
685
    """Get the hypervisor type for this cluster.
686

687
    """
688
    return self._config_data.cluster.default_hypervisor
689

    
690
  @locking.ssynchronized(_config_lock, shared=1)
691
  def GetHostKey(self):
692
    """Return the rsa hostkey from the config.
693

694
    @rtype: string
695
    @return: the rsa hostkey
696

697
    """
698
    return self._config_data.cluster.rsahostkeypub
699

    
700
  @locking.ssynchronized(_config_lock)
701
  def AddInstance(self, instance):
702
    """Add an instance to the config.
703

704
    This should be used after creating a new instance.
705

706
    @type instance: L{objects.Instance}
707
    @param instance: the instance object
708

709
    """
710
    if not isinstance(instance, objects.Instance):
711
      raise errors.ProgrammerError("Invalid type passed to AddInstance")
712

    
713
    if instance.disk_template != constants.DT_DISKLESS:
714
      all_lvs = instance.MapLVsByNode()
715
      logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
716

    
717
    all_macs = self._AllMACs()
718
    for nic in instance.nics:
719
      if nic.mac in all_macs:
720
        raise errors.ConfigurationError("Cannot add instance %s:"
721
          " MAC address '%s' already in use." % (instance.name, nic.mac))
722

    
723
    instance.serial_no = 1
724
    self._config_data.instances[instance.name] = instance
725
    self._config_data.cluster.serial_no += 1
726
    self._UnlockedReleaseDRBDMinors(instance.name)
727
    for nic in instance.nics:
728
      self._temporary_macs.discard(nic.mac)
729
    self._WriteConfig()
730

    
731
  def _SetInstanceStatus(self, instance_name, status):
732
    """Set the instance's status to a given value.
733

734
    """
735
    assert isinstance(status, bool), \
736
           "Invalid status '%s' passed to SetInstanceStatus" % (status,)
737

    
738
    if instance_name not in self._config_data.instances:
739
      raise errors.ConfigurationError("Unknown instance '%s'" %
740
                                      instance_name)
741
    instance = self._config_data.instances[instance_name]
742
    if instance.admin_up != status:
743
      instance.admin_up = status
744
      instance.serial_no += 1
745
      self._WriteConfig()
746

    
747
  @locking.ssynchronized(_config_lock)
748
  def MarkInstanceUp(self, instance_name):
749
    """Mark the instance status to up in the config.
750

751
    """
752
    self._SetInstanceStatus(instance_name, True)
753

    
754
  @locking.ssynchronized(_config_lock)
755
  def RemoveInstance(self, instance_name):
756
    """Remove the instance from the configuration.
757

758
    """
759
    if instance_name not in self._config_data.instances:
760
      raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
761
    del self._config_data.instances[instance_name]
762
    self._config_data.cluster.serial_no += 1
763
    self._WriteConfig()
764

    
765
  @locking.ssynchronized(_config_lock)
766
  def RenameInstance(self, old_name, new_name):
767
    """Rename an instance.
768

769
    This needs to be done in ConfigWriter and not by RemoveInstance
770
    combined with AddInstance as only we can guarantee an atomic
771
    rename.
772

773
    """
774
    if old_name not in self._config_data.instances:
775
      raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
776
    inst = self._config_data.instances[old_name]
777
    del self._config_data.instances[old_name]
778
    inst.name = new_name
779

    
780
    for disk in inst.disks:
781
      if disk.dev_type == constants.LD_FILE:
782
        # rename the file paths in logical and physical id
783
        file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
784
        disk.physical_id = disk.logical_id = (disk.logical_id[0],
785
                                              os.path.join(file_storage_dir,
786
                                                           inst.name,
787
                                                           disk.iv_name))
788

    
789
    self._config_data.instances[inst.name] = inst
790
    self._WriteConfig()
791

    
792
  @locking.ssynchronized(_config_lock)
793
  def MarkInstanceDown(self, instance_name):
794
    """Mark the status of an instance to down in the configuration.
795

796
    """
797
    self._SetInstanceStatus(instance_name, False)
798

    
799
  def _UnlockedGetInstanceList(self):
800
    """Get the list of instances.
801

802
    This function is for internal use, when the config lock is already held.
803

804
    """
805
    return self._config_data.instances.keys()
806

    
807
  @locking.ssynchronized(_config_lock, shared=1)
808
  def GetInstanceList(self):
809
    """Get the list of instances.
810

811
    @return: array of instances, ex. ['instance2.example.com',
812
        'instance1.example.com']
813

814
    """
815
    return self._UnlockedGetInstanceList()
816

    
817
  @locking.ssynchronized(_config_lock, shared=1)
818
  def ExpandInstanceName(self, short_name):
819
    """Attempt to expand an incomplete instance name.
820

821
    """
822
    return utils.MatchNameComponent(short_name,
823
                                    self._config_data.instances.keys())
824

    
825
  def _UnlockedGetInstanceInfo(self, instance_name):
826
    """Returns information about an instance.
827

828
    This function is for internal use, when the config lock is already held.
829

830
    """
831
    if instance_name not in self._config_data.instances:
832
      return None
833

    
834
    return self._config_data.instances[instance_name]
835

    
836
  @locking.ssynchronized(_config_lock, shared=1)
837
  def GetInstanceInfo(self, instance_name):
838
    """Returns information about an instance.
839

840
    It takes the information from the configuration file. Other information of
841
    an instance are taken from the live systems.
842

843
    @param instance_name: name of the instance, e.g.
844
        I{instance1.example.com}
845

846
    @rtype: L{objects.Instance}
847
    @return: the instance object
848

849
    """
850
    return self._UnlockedGetInstanceInfo(instance_name)
851

    
852
  @locking.ssynchronized(_config_lock, shared=1)
853
  def GetAllInstancesInfo(self):
854
    """Get the configuration of all instances.
855

856
    @rtype: dict
857
    @return: dict of (instance, instance_info), where instance_info is what
858
              would GetInstanceInfo return for the node
859

860
    """
861
    my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
862
                    for instance in self._UnlockedGetInstanceList()])
863
    return my_dict
864

    
865
  @locking.ssynchronized(_config_lock)
866
  def AddNode(self, node):
867
    """Add a node to the configuration.
868

869
    @type node: L{objects.Node}
870
    @param node: a Node instance
871

872
    """
873
    logging.info("Adding node %s to configuration" % node.name)
874

    
875
    node.serial_no = 1
876
    self._config_data.nodes[node.name] = node
877
    self._config_data.cluster.serial_no += 1
878
    self._WriteConfig()
879

    
880
  @locking.ssynchronized(_config_lock)
881
  def RemoveNode(self, node_name):
882
    """Remove a node from the configuration.
883

884
    """
885
    logging.info("Removing node %s from configuration" % node_name)
886

    
887
    if node_name not in self._config_data.nodes:
888
      raise errors.ConfigurationError("Unknown node '%s'" % node_name)
889

    
890
    del self._config_data.nodes[node_name]
891
    self._config_data.cluster.serial_no += 1
892
    self._WriteConfig()
893

    
894
  @locking.ssynchronized(_config_lock, shared=1)
895
  def ExpandNodeName(self, short_name):
896
    """Attempt to expand an incomplete instance name.
897

898
    """
899
    return utils.MatchNameComponent(short_name,
900
                                    self._config_data.nodes.keys())
901

    
902
  def _UnlockedGetNodeInfo(self, node_name):
903
    """Get the configuration of a node, as stored in the config.
904

905
    This function is for internal use, when the config lock is already
906
    held.
907

908
    @param node_name: the node name, e.g. I{node1.example.com}
909

910
    @rtype: L{objects.Node}
911
    @return: the node object
912

913
    """
914
    if node_name not in self._config_data.nodes:
915
      return None
916

    
917
    return self._config_data.nodes[node_name]
918

    
919

    
920
  @locking.ssynchronized(_config_lock, shared=1)
921
  def GetNodeInfo(self, node_name):
922
    """Get the configuration of a node, as stored in the config.
923

924
    This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
925

926
    @param node_name: the node name, e.g. I{node1.example.com}
927

928
    @rtype: L{objects.Node}
929
    @return: the node object
930

931
    """
932
    return self._UnlockedGetNodeInfo(node_name)
933

    
934
  def _UnlockedGetNodeList(self):
935
    """Return the list of nodes which are in the configuration.
936

937
    This function is for internal use, when the config lock is already
938
    held.
939

940
    @rtype: list
941

942
    """
943
    return self._config_data.nodes.keys()
944

    
945

    
946
  @locking.ssynchronized(_config_lock, shared=1)
947
  def GetNodeList(self):
948
    """Return the list of nodes which are in the configuration.
949

950
    """
951
    return self._UnlockedGetNodeList()
952

    
953
  @locking.ssynchronized(_config_lock, shared=1)
954
  def GetOnlineNodeList(self):
955
    """Return the list of nodes which are online.
956

957
    """
958
    all_nodes = [self._UnlockedGetNodeInfo(node)
959
                 for node in self._UnlockedGetNodeList()]
960
    return [node.name for node in all_nodes if not node.offline]
961

    
962
  @locking.ssynchronized(_config_lock, shared=1)
963
  def GetAllNodesInfo(self):
964
    """Get the configuration of all nodes.
965

966
    @rtype: dict
967
    @return: dict of (node, node_info), where node_info is what
968
              would GetNodeInfo return for the node
969

970
    """
971
    my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
972
                    for node in self._UnlockedGetNodeList()])
973
    return my_dict
974

    
975
  def _UnlockedGetMasterCandidateStats(self, exceptions=None):
976
    """Get the number of current and maximum desired and possible candidates.
977

978
    @type exceptions: list
979
    @param exceptions: if passed, list of nodes that should be ignored
980
    @rtype: tuple
981
    @return: tuple of (current, desired and possible)
982

983
    """
984
    mc_now = mc_max = 0
985
    for node in self._config_data.nodes.values():
986
      if exceptions and node.name in exceptions:
987
        continue
988
      if not (node.offline or node.drained):
989
        mc_max += 1
990
      if node.master_candidate:
991
        mc_now += 1
992
    mc_max = min(mc_max, self._config_data.cluster.candidate_pool_size)
993
    return (mc_now, mc_max)
994

    
995
  @locking.ssynchronized(_config_lock, shared=1)
996
  def GetMasterCandidateStats(self, exceptions=None):
997
    """Get the number of current and maximum possible candidates.
998

999
    This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1000

1001
    @type exceptions: list
1002
    @param exceptions: if passed, list of nodes that should be ignored
1003
    @rtype: tuple
1004
    @return: tuple of (current, max)
1005

1006
    """
1007
    return self._UnlockedGetMasterCandidateStats(exceptions)
1008

    
1009
  @locking.ssynchronized(_config_lock)
1010
  def MaintainCandidatePool(self):
1011
    """Try to grow the candidate pool to the desired size.
1012

1013
    @rtype: list
1014
    @return: list with the adjusted nodes (L{objects.Node} instances)
1015

1016
    """
1017
    mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
1018
    mod_list = []
1019
    if mc_now < mc_max:
1020
      node_list = self._config_data.nodes.keys()
1021
      random.shuffle(node_list)
1022
      for name in node_list:
1023
        if mc_now >= mc_max:
1024
          break
1025
        node = self._config_data.nodes[name]
1026
        if node.master_candidate or node.offline or node.drained:
1027
          continue
1028
        mod_list.append(node)
1029
        node.master_candidate = True
1030
        node.serial_no += 1
1031
        mc_now += 1
1032
      if mc_now != mc_max:
1033
        # this should not happen
1034
        logging.warning("Warning: MaintainCandidatePool didn't manage to"
1035
                        " fill the candidate pool (%d/%d)", mc_now, mc_max)
1036
      if mod_list:
1037
        self._config_data.cluster.serial_no += 1
1038
        self._WriteConfig()
1039

    
1040
    return mod_list
1041

    
1042
  def _BumpSerialNo(self):
1043
    """Bump up the serial number of the config.
1044

1045
    """
1046
    self._config_data.serial_no += 1
1047

    
1048
  def _OpenConfig(self):
1049
    """Read the config data from disk.
1050

1051
    """
1052
    f = open(self._cfg_file, 'r')
1053
    try:
1054
      try:
1055
        data = objects.ConfigData.FromDict(serializer.Load(f.read()))
1056
      except Exception, err:
1057
        raise errors.ConfigurationError(err)
1058
    finally:
1059
      f.close()
1060

    
1061
    # Make sure the configuration has the right version
1062
    _ValidateConfig(data)
1063

    
1064
    if (not hasattr(data, 'cluster') or
1065
        not hasattr(data.cluster, 'rsahostkeypub')):
1066
      raise errors.ConfigurationError("Incomplete configuration"
1067
                                      " (missing cluster.rsahostkeypub)")
1068

    
1069
    # Upgrade configuration if needed
1070
    data.UpgradeConfig()
1071

    
1072
    self._config_data = data
1073
    # reset the last serial as -1 so that the next write will cause
1074
    # ssconf update
1075
    self._last_cluster_serial = -1
1076

    
1077
  def _DistributeConfig(self):
1078
    """Distribute the configuration to the other nodes.
1079

1080
    Currently, this only copies the configuration file. In the future,
1081
    it could be used to encapsulate the 2/3-phase update mechanism.
1082

1083
    """
1084
    if self._offline:
1085
      return True
1086
    bad = False
1087

    
1088
    node_list = []
1089
    addr_list = []
1090
    myhostname = self._my_hostname
1091
    # we can skip checking whether _UnlockedGetNodeInfo returns None
1092
    # since the node list comes from _UnlocketGetNodeList, and we are
1093
    # called with the lock held, so no modifications should take place
1094
    # in between
1095
    for node_name in self._UnlockedGetNodeList():
1096
      if node_name == myhostname:
1097
        continue
1098
      node_info = self._UnlockedGetNodeInfo(node_name)
1099
      if not node_info.master_candidate:
1100
        continue
1101
      node_list.append(node_info.name)
1102
      addr_list.append(node_info.primary_ip)
1103

    
1104
    result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1105
                                            address_list=addr_list)
1106
    for node in node_list:
1107
      if not result[node]:
1108
        logging.error("copy of file %s to node %s failed",
1109
                      self._cfg_file, node)
1110
        bad = True
1111
    return not bad
1112

    
1113
  def _WriteConfig(self, destination=None):
1114
    """Write the configuration data to persistent storage.
1115

1116
    """
1117
    config_errors = self._UnlockedVerifyConfig()
1118
    if config_errors:
1119
      raise errors.ConfigurationError("Configuration data is not"
1120
                                      " consistent: %s" %
1121
                                      (", ".join(config_errors)))
1122
    if destination is None:
1123
      destination = self._cfg_file
1124
    self._BumpSerialNo()
1125
    txt = serializer.Dump(self._config_data.ToDict())
1126
    dir_name, file_name = os.path.split(destination)
1127
    fd, name = tempfile.mkstemp('.newconfig', file_name, dir_name)
1128
    f = os.fdopen(fd, 'w')
1129
    try:
1130
      f.write(txt)
1131
      os.fsync(f.fileno())
1132
    finally:
1133
      f.close()
1134
    # we don't need to do os.close(fd) as f.close() did it
1135
    os.rename(name, destination)
1136
    self.write_count += 1
1137

    
1138
    # and redistribute the config file to master candidates
1139
    self._DistributeConfig()
1140

    
1141
    # Write ssconf files on all nodes (including locally)
1142
    if self._last_cluster_serial < self._config_data.cluster.serial_no:
1143
      if not self._offline:
1144
        rpc.RpcRunner.call_write_ssconf_files(self._UnlockedGetNodeList(),
1145
                                              self._UnlockedGetSsconfValues())
1146
      self._last_cluster_serial = self._config_data.cluster.serial_no
1147

    
1148
  def _UnlockedGetSsconfValues(self):
1149
    """Return the values needed by ssconf.
1150

1151
    @rtype: dict
1152
    @return: a dictionary with keys the ssconf names and values their
1153
        associated value
1154

1155
    """
1156
    fn = "\n".join
1157
    instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1158
    node_names = utils.NiceSort(self._UnlockedGetNodeList())
1159
    node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1160

    
1161
    instance_data = fn(instance_names)
1162
    off_data = fn(node.name for node in node_info if node.offline)
1163
    on_data = fn(node.name for node in node_info if not node.offline)
1164
    mc_data = fn(node.name for node in node_info if node.master_candidate)
1165
    node_data = fn(node_names)
1166

    
1167
    cluster = self._config_data.cluster
1168
    cluster_tags = fn(cluster.GetTags())
1169
    return {
1170
      constants.SS_CLUSTER_NAME: cluster.cluster_name,
1171
      constants.SS_CLUSTER_TAGS: cluster_tags,
1172
      constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1173
      constants.SS_MASTER_CANDIDATES: mc_data,
1174
      constants.SS_MASTER_IP: cluster.master_ip,
1175
      constants.SS_MASTER_NETDEV: cluster.master_netdev,
1176
      constants.SS_MASTER_NODE: cluster.master_node,
1177
      constants.SS_NODE_LIST: node_data,
1178
      constants.SS_OFFLINE_NODES: off_data,
1179
      constants.SS_ONLINE_NODES: on_data,
1180
      constants.SS_INSTANCE_LIST: instance_data,
1181
      constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1182
      }
1183

    
1184
  @locking.ssynchronized(_config_lock, shared=1)
1185
  def GetVGName(self):
1186
    """Return the volume group name.
1187

1188
    """
1189
    return self._config_data.cluster.volume_group_name
1190

    
1191
  @locking.ssynchronized(_config_lock)
1192
  def SetVGName(self, vg_name):
1193
    """Set the volume group name.
1194

1195
    """
1196
    self._config_data.cluster.volume_group_name = vg_name
1197
    self._config_data.cluster.serial_no += 1
1198
    self._WriteConfig()
1199

    
1200
  @locking.ssynchronized(_config_lock, shared=1)
1201
  def GetDefBridge(self):
1202
    """Return the default bridge.
1203

1204
    """
1205
    return self._config_data.cluster.default_bridge
1206

    
1207
  @locking.ssynchronized(_config_lock, shared=1)
1208
  def GetMACPrefix(self):
1209
    """Return the mac prefix.
1210

1211
    """
1212
    return self._config_data.cluster.mac_prefix
1213

    
1214
  @locking.ssynchronized(_config_lock, shared=1)
1215
  def GetClusterInfo(self):
1216
    """Returns information about the cluster
1217

1218
    @rtype: L{objects.Cluster}
1219
    @return: the cluster object
1220

1221
    """
1222
    return self._config_data.cluster
1223

    
1224
  @locking.ssynchronized(_config_lock)
1225
  def Update(self, target):
1226
    """Notify function to be called after updates.
1227

1228
    This function must be called when an object (as returned by
1229
    GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1230
    caller wants the modifications saved to the backing store. Note
1231
    that all modified objects will be saved, but the target argument
1232
    is the one the caller wants to ensure that it's saved.
1233

1234
    @param target: an instance of either L{objects.Cluster},
1235
        L{objects.Node} or L{objects.Instance} which is existing in
1236
        the cluster
1237

1238
    """
1239
    if self._config_data is None:
1240
      raise errors.ProgrammerError("Configuration file not read,"
1241
                                   " cannot save.")
1242
    update_serial = False
1243
    if isinstance(target, objects.Cluster):
1244
      test = target == self._config_data.cluster
1245
    elif isinstance(target, objects.Node):
1246
      test = target in self._config_data.nodes.values()
1247
      update_serial = True
1248
    elif isinstance(target, objects.Instance):
1249
      test = target in self._config_data.instances.values()
1250
    else:
1251
      raise errors.ProgrammerError("Invalid object type (%s) passed to"
1252
                                   " ConfigWriter.Update" % type(target))
1253
    if not test:
1254
      raise errors.ConfigurationError("Configuration updated since object"
1255
                                      " has been read or unknown object")
1256
    target.serial_no += 1
1257

    
1258
    if update_serial:
1259
      # for node updates, we need to increase the cluster serial too
1260
      self._config_data.cluster.serial_no += 1
1261

    
1262
    if isinstance(target, objects.Instance):
1263
      self._UnlockedReleaseDRBDMinors(target.name)
1264
      for nic in target.nics:
1265
        self._temporary_macs.discard(nic.mac)
1266

    
1267
    self._WriteConfig()