Statistics
| Branch: | Tag: | Revision:

root / lib / config.py @ f9780ccd

History | View | Annotate | Download (40.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Configuration management for Ganeti
23

24
This module provides the interface to the Ganeti cluster configuration.
25

26
The configuration data is stored on every node but is updated on the master
27
only. After each update, the master distributes the data to the other nodes.
28

29
Currently, the data storage format is JSON. YAML was slow and consuming too
30
much memory.
31

32
"""
33

    
34
import os
35
import tempfile
36
import random
37
import logging
38

    
39
from ganeti import errors
40
from ganeti import locking
41
from ganeti import utils
42
from ganeti import constants
43
from ganeti import rpc
44
from ganeti import objects
45
from ganeti import serializer
46

    
47

    
48
_config_lock = locking.SharedLock()
49

    
50

    
51
def _ValidateConfig(data):
52
  """Verifies that a configuration objects looks valid.
53

54
  This only verifies the version of the configuration.
55

56
  @raise errors.ConfigurationError: if the version differs from what
57
      we expect
58

59
  """
60
  if data.version != constants.CONFIG_VERSION:
61
    raise errors.ConfigurationError("Cluster configuration version"
62
                                    " mismatch, got %s instead of %s" %
63
                                    (data.version,
64
                                     constants.CONFIG_VERSION))
65

    
66

    
67
class ConfigWriter:
68
  """The interface to the cluster configuration.
69

70
  """
71
  def __init__(self, cfg_file=None, offline=False):
72
    self.write_count = 0
73
    self._lock = _config_lock
74
    self._config_data = None
75
    self._offline = offline
76
    if cfg_file is None:
77
      self._cfg_file = constants.CLUSTER_CONF_FILE
78
    else:
79
      self._cfg_file = cfg_file
80
    self._temporary_ids = set()
81
    self._temporary_drbds = {}
82
    self._temporary_macs = set()
83
    # Note: in order to prevent errors when resolving our name in
84
    # _DistributeConfig, we compute it here once and reuse it; it's
85
    # better to raise an error before starting to modify the config
86
    # file than after it was modified
87
    self._my_hostname = utils.HostInfo().name
88
    self._last_cluster_serial = -1
89
    self._OpenConfig()
90

    
91
  # this method needs to be static, so that we can call it on the class
92
  @staticmethod
93
  def IsCluster():
94
    """Check if the cluster is configured.
95

96
    """
97
    return os.path.exists(constants.CLUSTER_CONF_FILE)
98

    
99
  @locking.ssynchronized(_config_lock, shared=1)
100
  def GenerateMAC(self):
101
    """Generate a MAC for an instance.
102

103
    This should check the current instances for duplicates.
104

105
    """
106
    prefix = self._config_data.cluster.mac_prefix
107
    all_macs = self._AllMACs()
108
    retries = 64
109
    while retries > 0:
110
      byte1 = random.randrange(0, 256)
111
      byte2 = random.randrange(0, 256)
112
      byte3 = random.randrange(0, 256)
113
      mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
114
      if mac not in all_macs and mac not in self._temporary_macs:
115
        break
116
      retries -= 1
117
    else:
118
      raise errors.ConfigurationError("Can't generate unique MAC")
119
    self._temporary_macs.add(mac)
120
    return mac
121

    
122
  @locking.ssynchronized(_config_lock, shared=1)
123
  def IsMacInUse(self, mac):
124
    """Predicate: check if the specified MAC is in use in the Ganeti cluster.
125

126
    This only checks instances managed by this cluster, it does not
127
    check for potential collisions elsewhere.
128

129
    """
130
    all_macs = self._AllMACs()
131
    return mac in all_macs or mac in self._temporary_macs
132

    
133
  @locking.ssynchronized(_config_lock, shared=1)
134
  def GenerateDRBDSecret(self):
135
    """Generate a DRBD secret.
136

137
    This checks the current disks for duplicates.
138

139
    """
140
    all_secrets = self._AllDRBDSecrets()
141
    retries = 64
142
    while retries > 0:
143
      secret = utils.GenerateSecret()
144
      if secret not in all_secrets:
145
        break
146
      retries -= 1
147
    else:
148
      raise errors.ConfigurationError("Can't generate unique DRBD secret")
149
    return secret
150

    
151
  def _ComputeAllLVs(self):
152
    """Compute the list of all LVs.
153

154
    """
155
    lvnames = set()
156
    for instance in self._config_data.instances.values():
157
      node_data = instance.MapLVsByNode()
158
      for lv_list in node_data.values():
159
        lvnames.update(lv_list)
160
    return lvnames
161

    
162
  @locking.ssynchronized(_config_lock, shared=1)
163
  def GenerateUniqueID(self, exceptions=None):
164
    """Generate an unique disk name.
165

166
    This checks the current node, instances and disk names for
167
    duplicates.
168

169
    @param exceptions: a list with some other names which should be checked
170
        for uniqueness (used for example when you want to get
171
        more than one id at one time without adding each one in
172
        turn to the config file)
173

174
    @rtype: string
175
    @return: the unique id
176

177
    """
178
    existing = set()
179
    existing.update(self._temporary_ids)
180
    existing.update(self._ComputeAllLVs())
181
    existing.update(self._config_data.instances.keys())
182
    existing.update(self._config_data.nodes.keys())
183
    if exceptions is not None:
184
      existing.update(exceptions)
185
    retries = 64
186
    while retries > 0:
187
      unique_id = utils.NewUUID()
188
      if unique_id not in existing and unique_id is not None:
189
        break
190
    else:
191
      raise errors.ConfigurationError("Not able generate an unique ID"
192
                                      " (last tried ID: %s" % unique_id)
193
    self._temporary_ids.add(unique_id)
194
    return unique_id
195

    
196
  def _AllMACs(self):
197
    """Return all MACs present in the config.
198

199
    @rtype: list
200
    @return: the list of all MACs
201

202
    """
203
    result = []
204
    for instance in self._config_data.instances.values():
205
      for nic in instance.nics:
206
        result.append(nic.mac)
207

    
208
    return result
209

    
210
  def _AllDRBDSecrets(self):
211
    """Return all DRBD secrets present in the config.
212

213
    @rtype: list
214
    @return: the list of all DRBD secrets
215

216
    """
217
    def helper(disk, result):
218
      """Recursively gather secrets from this disk."""
219
      if disk.dev_type == constants.DT_DRBD8:
220
        result.append(disk.logical_id[5])
221
      if disk.children:
222
        for child in disk.children:
223
          helper(child, result)
224

    
225
    result = []
226
    for instance in self._config_data.instances.values():
227
      for disk in instance.disks:
228
        helper(disk, result)
229

    
230
    return result
231

    
232
  def _CheckDiskIDs(self, disk, l_ids, p_ids):
233
    """Compute duplicate disk IDs
234

235
    @type disk: L{objects.Disk}
236
    @param disk: the disk at which to start searching
237
    @type l_ids: list
238
    @param l_ids: list of current logical ids
239
    @type p_ids: list
240
    @param p_ids: list of current physical ids
241
    @rtype: list
242
    @return: a list of error messages
243

244
    """
245
    result = []
246
    if disk.logical_id is not None:
247
      if disk.logical_id in l_ids:
248
        result.append("duplicate logical id %s" % str(disk.logical_id))
249
      else:
250
        l_ids.append(disk.logical_id)
251
    if disk.physical_id is not None:
252
      if disk.physical_id in p_ids:
253
        result.append("duplicate physical id %s" % str(disk.physical_id))
254
      else:
255
        p_ids.append(disk.physical_id)
256

    
257
    if disk.children:
258
      for child in disk.children:
259
        result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
260
    return result
261

    
262
  def _UnlockedVerifyConfig(self):
263
    """Verify function.
264

265
    @rtype: list
266
    @return: a list of error messages; a non-empty list signifies
267
        configuration errors
268

269
    """
270
    result = []
271
    seen_macs = []
272
    ports = {}
273
    data = self._config_data
274
    seen_lids = []
275
    seen_pids = []
276

    
277
    # global cluster checks
278
    if not data.cluster.enabled_hypervisors:
279
      result.append("enabled hypervisors list doesn't have any entries")
280
    invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
281
    if invalid_hvs:
282
      result.append("enabled hypervisors contains invalid entries: %s" %
283
                    invalid_hvs)
284

    
285
    if data.cluster.master_node not in data.nodes:
286
      result.append("cluster has invalid primary node '%s'" %
287
                    data.cluster.master_node)
288

    
289
    # per-instance checks
290
    for instance_name in data.instances:
291
      instance = data.instances[instance_name]
292
      if instance.primary_node not in data.nodes:
293
        result.append("instance '%s' has invalid primary node '%s'" %
294
                      (instance_name, instance.primary_node))
295
      for snode in instance.secondary_nodes:
296
        if snode not in data.nodes:
297
          result.append("instance '%s' has invalid secondary node '%s'" %
298
                        (instance_name, snode))
299
      for idx, nic in enumerate(instance.nics):
300
        if nic.mac in seen_macs:
301
          result.append("instance '%s' has NIC %d mac %s duplicate" %
302
                        (instance_name, idx, nic.mac))
303
        else:
304
          seen_macs.append(nic.mac)
305

    
306
      # gather the drbd ports for duplicate checks
307
      for dsk in instance.disks:
308
        if dsk.dev_type in constants.LDS_DRBD:
309
          tcp_port = dsk.logical_id[2]
310
          if tcp_port not in ports:
311
            ports[tcp_port] = []
312
          ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
313
      # gather network port reservation
314
      net_port = getattr(instance, "network_port", None)
315
      if net_port is not None:
316
        if net_port not in ports:
317
          ports[net_port] = []
318
        ports[net_port].append((instance.name, "network port"))
319

    
320
      # instance disk verify
321
      for idx, disk in enumerate(instance.disks):
322
        result.extend(["instance '%s' disk %d error: %s" %
323
                       (instance.name, idx, msg) for msg in disk.Verify()])
324
        result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
325

    
326
    # cluster-wide pool of free ports
327
    for free_port in data.cluster.tcpudp_port_pool:
328
      if free_port not in ports:
329
        ports[free_port] = []
330
      ports[free_port].append(("cluster", "port marked as free"))
331

    
332
    # compute tcp/udp duplicate ports
333
    keys = ports.keys()
334
    keys.sort()
335
    for pnum in keys:
336
      pdata = ports[pnum]
337
      if len(pdata) > 1:
338
        txt = ", ".join(["%s/%s" % val for val in pdata])
339
        result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
340

    
341
    # highest used tcp port check
342
    if keys:
343
      if keys[-1] > data.cluster.highest_used_port:
344
        result.append("Highest used port mismatch, saved %s, computed %s" %
345
                      (data.cluster.highest_used_port, keys[-1]))
346

    
347
    if not data.nodes[data.cluster.master_node].master_candidate:
348
      result.append("Master node is not a master candidate")
349

    
350
    # master candidate checks
351
    mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
352
    if mc_now < mc_max:
353
      result.append("Not enough master candidates: actual %d, target %d" %
354
                    (mc_now, mc_max))
355

    
356
    # node checks
357
    for node in data.nodes.values():
358
      if [node.master_candidate, node.drained, node.offline].count(True) > 1:
359
        result.append("Node %s state is invalid: master_candidate=%s,"
360
                      " drain=%s, offline=%s" %
361
                      (node.name, node.master_candidate, node.drain,
362
                       node.offline))
363

    
364
    # drbd minors check
365
    d_map, duplicates = self._UnlockedComputeDRBDMap()
366
    for node, minor, instance_a, instance_b in duplicates:
367
      result.append("DRBD minor %d on node %s is assigned twice to instances"
368
                    " %s and %s" % (minor, node, instance_a, instance_b))
369

    
370
    return result
371

    
372
  @locking.ssynchronized(_config_lock, shared=1)
373
  def VerifyConfig(self):
374
    """Verify function.
375

376
    This is just a wrapper over L{_UnlockedVerifyConfig}.
377

378
    @rtype: list
379
    @return: a list of error messages; a non-empty list signifies
380
        configuration errors
381

382
    """
383
    return self._UnlockedVerifyConfig()
384

    
385
  def _UnlockedSetDiskID(self, disk, node_name):
386
    """Convert the unique ID to the ID needed on the target nodes.
387

388
    This is used only for drbd, which needs ip/port configuration.
389

390
    The routine descends down and updates its children also, because
391
    this helps when the only the top device is passed to the remote
392
    node.
393

394
    This function is for internal use, when the config lock is already held.
395

396
    """
397
    if disk.children:
398
      for child in disk.children:
399
        self._UnlockedSetDiskID(child, node_name)
400

    
401
    if disk.logical_id is None and disk.physical_id is not None:
402
      return
403
    if disk.dev_type == constants.LD_DRBD8:
404
      pnode, snode, port, pminor, sminor, secret = disk.logical_id
405
      if node_name not in (pnode, snode):
406
        raise errors.ConfigurationError("DRBD device not knowing node %s" %
407
                                        node_name)
408
      pnode_info = self._UnlockedGetNodeInfo(pnode)
409
      snode_info = self._UnlockedGetNodeInfo(snode)
410
      if pnode_info is None or snode_info is None:
411
        raise errors.ConfigurationError("Can't find primary or secondary node"
412
                                        " for %s" % str(disk))
413
      p_data = (pnode_info.secondary_ip, port)
414
      s_data = (snode_info.secondary_ip, port)
415
      if pnode == node_name:
416
        disk.physical_id = p_data + s_data + (pminor, secret)
417
      else: # it must be secondary, we tested above
418
        disk.physical_id = s_data + p_data + (sminor, secret)
419
    else:
420
      disk.physical_id = disk.logical_id
421
    return
422

    
423
  @locking.ssynchronized(_config_lock)
424
  def SetDiskID(self, disk, node_name):
425
    """Convert the unique ID to the ID needed on the target nodes.
426

427
    This is used only for drbd, which needs ip/port configuration.
428

429
    The routine descends down and updates its children also, because
430
    this helps when the only the top device is passed to the remote
431
    node.
432

433
    """
434
    return self._UnlockedSetDiskID(disk, node_name)
435

    
436
  @locking.ssynchronized(_config_lock)
437
  def AddTcpUdpPort(self, port):
438
    """Adds a new port to the available port pool.
439

440
    """
441
    if not isinstance(port, int):
442
      raise errors.ProgrammerError("Invalid type passed for port")
443

    
444
    self._config_data.cluster.tcpudp_port_pool.add(port)
445
    self._WriteConfig()
446

    
447
  @locking.ssynchronized(_config_lock, shared=1)
448
  def GetPortList(self):
449
    """Returns a copy of the current port list.
450

451
    """
452
    return self._config_data.cluster.tcpudp_port_pool.copy()
453

    
454
  @locking.ssynchronized(_config_lock)
455
  def AllocatePort(self):
456
    """Allocate a port.
457

458
    The port will be taken from the available port pool or from the
459
    default port range (and in this case we increase
460
    highest_used_port).
461

462
    """
463
    # If there are TCP/IP ports configured, we use them first.
464
    if self._config_data.cluster.tcpudp_port_pool:
465
      port = self._config_data.cluster.tcpudp_port_pool.pop()
466
    else:
467
      port = self._config_data.cluster.highest_used_port + 1
468
      if port >= constants.LAST_DRBD_PORT:
469
        raise errors.ConfigurationError("The highest used port is greater"
470
                                        " than %s. Aborting." %
471
                                        constants.LAST_DRBD_PORT)
472
      self._config_data.cluster.highest_used_port = port
473

    
474
    self._WriteConfig()
475
    return port
476

    
477
  def _UnlockedComputeDRBDMap(self):
478
    """Compute the used DRBD minor/nodes.
479

480
    @rtype: (dict, list)
481
    @return: dictionary of node_name: dict of minor: instance_name;
482
        the returned dict will have all the nodes in it (even if with
483
        an empty list), and a list of duplicates; if the duplicates
484
        list is not empty, the configuration is corrupted and its caller
485
        should raise an exception
486

487
    """
488
    def _AppendUsedPorts(instance_name, disk, used):
489
      duplicates = []
490
      if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
491
        node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
492
        for node, port in ((node_a, minor_a), (node_b, minor_b)):
493
          assert node in used, ("Node '%s' of instance '%s' not found"
494
                                " in node list" % (node, instance_name))
495
          if port in used[node]:
496
            duplicates.append((node, port, instance_name, used[node][port]))
497
          else:
498
            used[node][port] = instance_name
499
      if disk.children:
500
        for child in disk.children:
501
          duplicates.extend(_AppendUsedPorts(instance_name, child, used))
502
      return duplicates
503

    
504
    duplicates = []
505
    my_dict = dict((node, {}) for node in self._config_data.nodes)
506
    for instance in self._config_data.instances.itervalues():
507
      for disk in instance.disks:
508
        duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
509
    for (node, minor), instance in self._temporary_drbds.iteritems():
510
      if minor in my_dict[node] and my_dict[node][minor] != instance:
511
        duplicates.append((node, minor, instance, my_dict[node][minor]))
512
      else:
513
        my_dict[node][minor] = instance
514
    return my_dict, duplicates
515

    
516
  @locking.ssynchronized(_config_lock)
517
  def ComputeDRBDMap(self):
518
    """Compute the used DRBD minor/nodes.
519

520
    This is just a wrapper over L{_UnlockedComputeDRBDMap}.
521

522
    @return: dictionary of node_name: dict of minor: instance_name;
523
        the returned dict will have all the nodes in it (even if with
524
        an empty list).
525

526
    """
527
    d_map, duplicates = self._UnlockedComputeDRBDMap()
528
    if duplicates:
529
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
530
                                      str(duplicates))
531
    return d_map
532

    
533
  @locking.ssynchronized(_config_lock)
534
  def AllocateDRBDMinor(self, nodes, instance):
535
    """Allocate a drbd minor.
536

537
    The free minor will be automatically computed from the existing
538
    devices. A node can be given multiple times in order to allocate
539
    multiple minors. The result is the list of minors, in the same
540
    order as the passed nodes.
541

542
    @type instance: string
543
    @param instance: the instance for which we allocate minors
544

545
    """
546
    assert isinstance(instance, basestring), \
547
           "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
548

    
549
    d_map, duplicates = self._UnlockedComputeDRBDMap()
550
    if duplicates:
551
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
552
                                      str(duplicates))
553
    result = []
554
    for nname in nodes:
555
      ndata = d_map[nname]
556
      if not ndata:
557
        # no minors used, we can start at 0
558
        result.append(0)
559
        ndata[0] = instance
560
        self._temporary_drbds[(nname, 0)] = instance
561
        continue
562
      keys = ndata.keys()
563
      keys.sort()
564
      ffree = utils.FirstFree(keys)
565
      if ffree is None:
566
        # return the next minor
567
        # TODO: implement high-limit check
568
        minor = keys[-1] + 1
569
      else:
570
        minor = ffree
571
      # double-check minor against current instances
572
      assert minor not in d_map[nname], \
573
             ("Attempt to reuse allocated DRBD minor %d on node %s,"
574
              " already allocated to instance %s" %
575
              (minor, nname, d_map[nname][minor]))
576
      ndata[minor] = instance
577
      # double-check minor against reservation
578
      r_key = (nname, minor)
579
      assert r_key not in self._temporary_drbds, \
580
             ("Attempt to reuse reserved DRBD minor %d on node %s,"
581
              " reserved for instance %s" %
582
              (minor, nname, self._temporary_drbds[r_key]))
583
      self._temporary_drbds[r_key] = instance
584
      result.append(minor)
585
    logging.debug("Request to allocate drbd minors, input: %s, returning %s",
586
                  nodes, result)
587
    return result
588

    
589
  def _UnlockedReleaseDRBDMinors(self, instance):
590
    """Release temporary drbd minors allocated for a given instance.
591

592
    @type instance: string
593
    @param instance: the instance for which temporary minors should be
594
                     released
595

596
    """
597
    assert isinstance(instance, basestring), \
598
           "Invalid argument passed to ReleaseDRBDMinors"
599
    for key, name in self._temporary_drbds.items():
600
      if name == instance:
601
        del self._temporary_drbds[key]
602

    
603
  @locking.ssynchronized(_config_lock)
604
  def ReleaseDRBDMinors(self, instance):
605
    """Release temporary drbd minors allocated for a given instance.
606

607
    This should be called on the error paths, on the success paths
608
    it's automatically called by the ConfigWriter add and update
609
    functions.
610

611
    This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
612

613
    @type instance: string
614
    @param instance: the instance for which temporary minors should be
615
                     released
616

617
    """
618
    self._UnlockedReleaseDRBDMinors(instance)
619

    
620
  @locking.ssynchronized(_config_lock, shared=1)
621
  def GetConfigVersion(self):
622
    """Get the configuration version.
623

624
    @return: Config version
625

626
    """
627
    return self._config_data.version
628

    
629
  @locking.ssynchronized(_config_lock, shared=1)
630
  def GetClusterName(self):
631
    """Get cluster name.
632

633
    @return: Cluster name
634

635
    """
636
    return self._config_data.cluster.cluster_name
637

    
638
  @locking.ssynchronized(_config_lock, shared=1)
639
  def GetMasterNode(self):
640
    """Get the hostname of the master node for this cluster.
641

642
    @return: Master hostname
643

644
    """
645
    return self._config_data.cluster.master_node
646

    
647
  @locking.ssynchronized(_config_lock, shared=1)
648
  def GetMasterIP(self):
649
    """Get the IP of the master node for this cluster.
650

651
    @return: Master IP
652

653
    """
654
    return self._config_data.cluster.master_ip
655

    
656
  @locking.ssynchronized(_config_lock, shared=1)
657
  def GetMasterNetdev(self):
658
    """Get the master network device for this cluster.
659

660
    """
661
    return self._config_data.cluster.master_netdev
662

    
663
  @locking.ssynchronized(_config_lock, shared=1)
664
  def GetFileStorageDir(self):
665
    """Get the file storage dir for this cluster.
666

667
    """
668
    return self._config_data.cluster.file_storage_dir
669

    
670
  @locking.ssynchronized(_config_lock, shared=1)
671
  def GetHypervisorType(self):
672
    """Get the hypervisor type for this cluster.
673

674
    """
675
    return self._config_data.cluster.enabled_hypervisors[0]
676

    
677
  @locking.ssynchronized(_config_lock, shared=1)
678
  def GetHostKey(self):
679
    """Return the rsa hostkey from the config.
680

681
    @rtype: string
682
    @return: the rsa hostkey
683

684
    """
685
    return self._config_data.cluster.rsahostkeypub
686

    
687
  @locking.ssynchronized(_config_lock)
688
  def AddInstance(self, instance):
689
    """Add an instance to the config.
690

691
    This should be used after creating a new instance.
692

693
    @type instance: L{objects.Instance}
694
    @param instance: the instance object
695

696
    """
697
    if not isinstance(instance, objects.Instance):
698
      raise errors.ProgrammerError("Invalid type passed to AddInstance")
699

    
700
    if instance.disk_template != constants.DT_DISKLESS:
701
      all_lvs = instance.MapLVsByNode()
702
      logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
703

    
704
    all_macs = self._AllMACs()
705
    for nic in instance.nics:
706
      if nic.mac in all_macs:
707
        raise errors.ConfigurationError("Cannot add instance %s:"
708
          " MAC address '%s' already in use." % (instance.name, nic.mac))
709

    
710
    instance.serial_no = 1
711
    self._config_data.instances[instance.name] = instance
712
    self._config_data.cluster.serial_no += 1
713
    self._UnlockedReleaseDRBDMinors(instance.name)
714
    for nic in instance.nics:
715
      self._temporary_macs.discard(nic.mac)
716
    self._WriteConfig()
717

    
718
  def _SetInstanceStatus(self, instance_name, status):
719
    """Set the instance's status to a given value.
720

721
    """
722
    assert isinstance(status, bool), \
723
           "Invalid status '%s' passed to SetInstanceStatus" % (status,)
724

    
725
    if instance_name not in self._config_data.instances:
726
      raise errors.ConfigurationError("Unknown instance '%s'" %
727
                                      instance_name)
728
    instance = self._config_data.instances[instance_name]
729
    if instance.admin_up != status:
730
      instance.admin_up = status
731
      instance.serial_no += 1
732
      self._WriteConfig()
733

    
734
  @locking.ssynchronized(_config_lock)
735
  def MarkInstanceUp(self, instance_name):
736
    """Mark the instance status to up in the config.
737

738
    """
739
    self._SetInstanceStatus(instance_name, True)
740

    
741
  @locking.ssynchronized(_config_lock)
742
  def RemoveInstance(self, instance_name):
743
    """Remove the instance from the configuration.
744

745
    """
746
    if instance_name not in self._config_data.instances:
747
      raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
748
    del self._config_data.instances[instance_name]
749
    self._config_data.cluster.serial_no += 1
750
    self._WriteConfig()
751

    
752
  @locking.ssynchronized(_config_lock)
753
  def RenameInstance(self, old_name, new_name):
754
    """Rename an instance.
755

756
    This needs to be done in ConfigWriter and not by RemoveInstance
757
    combined with AddInstance as only we can guarantee an atomic
758
    rename.
759

760
    """
761
    if old_name not in self._config_data.instances:
762
      raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
763
    inst = self._config_data.instances[old_name]
764
    del self._config_data.instances[old_name]
765
    inst.name = new_name
766

    
767
    for disk in inst.disks:
768
      if disk.dev_type == constants.LD_FILE:
769
        # rename the file paths in logical and physical id
770
        file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
771
        disk.physical_id = disk.logical_id = (disk.logical_id[0],
772
                                              os.path.join(file_storage_dir,
773
                                                           inst.name,
774
                                                           disk.iv_name))
775

    
776
    self._config_data.instances[inst.name] = inst
777
    self._WriteConfig()
778

    
779
  @locking.ssynchronized(_config_lock)
780
  def MarkInstanceDown(self, instance_name):
781
    """Mark the status of an instance to down in the configuration.
782

783
    """
784
    self._SetInstanceStatus(instance_name, False)
785

    
786
  def _UnlockedGetInstanceList(self):
787
    """Get the list of instances.
788

789
    This function is for internal use, when the config lock is already held.
790

791
    """
792
    return self._config_data.instances.keys()
793

    
794
  @locking.ssynchronized(_config_lock, shared=1)
795
  def GetInstanceList(self):
796
    """Get the list of instances.
797

798
    @return: array of instances, ex. ['instance2.example.com',
799
        'instance1.example.com']
800

801
    """
802
    return self._UnlockedGetInstanceList()
803

    
804
  @locking.ssynchronized(_config_lock, shared=1)
805
  def ExpandInstanceName(self, short_name):
806
    """Attempt to expand an incomplete instance name.
807

808
    """
809
    return utils.MatchNameComponent(short_name,
810
                                    self._config_data.instances.keys())
811

    
812
  def _UnlockedGetInstanceInfo(self, instance_name):
813
    """Returns information about an instance.
814

815
    This function is for internal use, when the config lock is already held.
816

817
    """
818
    if instance_name not in self._config_data.instances:
819
      return None
820

    
821
    return self._config_data.instances[instance_name]
822

    
823
  @locking.ssynchronized(_config_lock, shared=1)
824
  def GetInstanceInfo(self, instance_name):
825
    """Returns information about an instance.
826

827
    It takes the information from the configuration file. Other information of
828
    an instance are taken from the live systems.
829

830
    @param instance_name: name of the instance, e.g.
831
        I{instance1.example.com}
832

833
    @rtype: L{objects.Instance}
834
    @return: the instance object
835

836
    """
837
    return self._UnlockedGetInstanceInfo(instance_name)
838

    
839
  @locking.ssynchronized(_config_lock, shared=1)
840
  def GetAllInstancesInfo(self):
841
    """Get the configuration of all instances.
842

843
    @rtype: dict
844
    @return: dict of (instance, instance_info), where instance_info is what
845
              would GetInstanceInfo return for the node
846

847
    """
848
    my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
849
                    for instance in self._UnlockedGetInstanceList()])
850
    return my_dict
851

    
852
  @locking.ssynchronized(_config_lock)
853
  def AddNode(self, node):
854
    """Add a node to the configuration.
855

856
    @type node: L{objects.Node}
857
    @param node: a Node instance
858

859
    """
860
    logging.info("Adding node %s to configuration" % node.name)
861

    
862
    node.serial_no = 1
863
    self._config_data.nodes[node.name] = node
864
    self._config_data.cluster.serial_no += 1
865
    self._WriteConfig()
866

    
867
  @locking.ssynchronized(_config_lock)
868
  def RemoveNode(self, node_name):
869
    """Remove a node from the configuration.
870

871
    """
872
    logging.info("Removing node %s from configuration" % node_name)
873

    
874
    if node_name not in self._config_data.nodes:
875
      raise errors.ConfigurationError("Unknown node '%s'" % node_name)
876

    
877
    del self._config_data.nodes[node_name]
878
    self._config_data.cluster.serial_no += 1
879
    self._WriteConfig()
880

    
881
  @locking.ssynchronized(_config_lock, shared=1)
882
  def ExpandNodeName(self, short_name):
883
    """Attempt to expand an incomplete instance name.
884

885
    """
886
    return utils.MatchNameComponent(short_name,
887
                                    self._config_data.nodes.keys())
888

    
889
  def _UnlockedGetNodeInfo(self, node_name):
890
    """Get the configuration of a node, as stored in the config.
891

892
    This function is for internal use, when the config lock is already
893
    held.
894

895
    @param node_name: the node name, e.g. I{node1.example.com}
896

897
    @rtype: L{objects.Node}
898
    @return: the node object
899

900
    """
901
    if node_name not in self._config_data.nodes:
902
      return None
903

    
904
    return self._config_data.nodes[node_name]
905

    
906

    
907
  @locking.ssynchronized(_config_lock, shared=1)
908
  def GetNodeInfo(self, node_name):
909
    """Get the configuration of a node, as stored in the config.
910

911
    This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
912

913
    @param node_name: the node name, e.g. I{node1.example.com}
914

915
    @rtype: L{objects.Node}
916
    @return: the node object
917

918
    """
919
    return self._UnlockedGetNodeInfo(node_name)
920

    
921
  def _UnlockedGetNodeList(self):
922
    """Return the list of nodes which are in the configuration.
923

924
    This function is for internal use, when the config lock is already
925
    held.
926

927
    @rtype: list
928

929
    """
930
    return self._config_data.nodes.keys()
931

    
932

    
933
  @locking.ssynchronized(_config_lock, shared=1)
934
  def GetNodeList(self):
935
    """Return the list of nodes which are in the configuration.
936

937
    """
938
    return self._UnlockedGetNodeList()
939

    
940
  @locking.ssynchronized(_config_lock, shared=1)
941
  def GetOnlineNodeList(self):
942
    """Return the list of nodes which are online.
943

944
    """
945
    all_nodes = [self._UnlockedGetNodeInfo(node)
946
                 for node in self._UnlockedGetNodeList()]
947
    return [node.name for node in all_nodes if not node.offline]
948

    
949
  @locking.ssynchronized(_config_lock, shared=1)
950
  def GetAllNodesInfo(self):
951
    """Get the configuration of all nodes.
952

953
    @rtype: dict
954
    @return: dict of (node, node_info), where node_info is what
955
              would GetNodeInfo return for the node
956

957
    """
958
    my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
959
                    for node in self._UnlockedGetNodeList()])
960
    return my_dict
961

    
962
  def _UnlockedGetMasterCandidateStats(self, exceptions=None):
963
    """Get the number of current and maximum desired and possible candidates.
964

965
    @type exceptions: list
966
    @param exceptions: if passed, list of nodes that should be ignored
967
    @rtype: tuple
968
    @return: tuple of (current, desired and possible)
969

970
    """
971
    mc_now = mc_max = 0
972
    for node in self._config_data.nodes.values():
973
      if exceptions and node.name in exceptions:
974
        continue
975
      if not (node.offline or node.drained):
976
        mc_max += 1
977
      if node.master_candidate:
978
        mc_now += 1
979
    mc_max = min(mc_max, self._config_data.cluster.candidate_pool_size)
980
    return (mc_now, mc_max)
981

    
982
  @locking.ssynchronized(_config_lock, shared=1)
983
  def GetMasterCandidateStats(self, exceptions=None):
984
    """Get the number of current and maximum possible candidates.
985

986
    This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
987

988
    @type exceptions: list
989
    @param exceptions: if passed, list of nodes that should be ignored
990
    @rtype: tuple
991
    @return: tuple of (current, max)
992

993
    """
994
    return self._UnlockedGetMasterCandidateStats(exceptions)
995

    
996
  @locking.ssynchronized(_config_lock)
997
  def MaintainCandidatePool(self):
998
    """Try to grow the candidate pool to the desired size.
999

1000
    @rtype: list
1001
    @return: list with the adjusted nodes (L{objects.Node} instances)
1002

1003
    """
1004
    mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
1005
    mod_list = []
1006
    if mc_now < mc_max:
1007
      node_list = self._config_data.nodes.keys()
1008
      random.shuffle(node_list)
1009
      for name in node_list:
1010
        if mc_now >= mc_max:
1011
          break
1012
        node = self._config_data.nodes[name]
1013
        if node.master_candidate or node.offline or node.drained:
1014
          continue
1015
        mod_list.append(node)
1016
        node.master_candidate = True
1017
        node.serial_no += 1
1018
        mc_now += 1
1019
      if mc_now != mc_max:
1020
        # this should not happen
1021
        logging.warning("Warning: MaintainCandidatePool didn't manage to"
1022
                        " fill the candidate pool (%d/%d)", mc_now, mc_max)
1023
      if mod_list:
1024
        self._config_data.cluster.serial_no += 1
1025
        self._WriteConfig()
1026

    
1027
    return mod_list
1028

    
1029
  def _BumpSerialNo(self):
1030
    """Bump up the serial number of the config.
1031

1032
    """
1033
    self._config_data.serial_no += 1
1034

    
1035
  def _OpenConfig(self):
1036
    """Read the config data from disk.
1037

1038
    """
1039
    f = open(self._cfg_file, 'r')
1040
    try:
1041
      try:
1042
        data = objects.ConfigData.FromDict(serializer.Load(f.read()))
1043
      except Exception, err:
1044
        raise errors.ConfigurationError(err)
1045
    finally:
1046
      f.close()
1047

    
1048
    # Make sure the configuration has the right version
1049
    _ValidateConfig(data)
1050

    
1051
    if (not hasattr(data, 'cluster') or
1052
        not hasattr(data.cluster, 'rsahostkeypub')):
1053
      raise errors.ConfigurationError("Incomplete configuration"
1054
                                      " (missing cluster.rsahostkeypub)")
1055
    self._config_data = data
1056
    # reset the last serial as -1 so that the next write will cause
1057
    # ssconf update
1058
    self._last_cluster_serial = -1
1059

    
1060
  def _DistributeConfig(self):
1061
    """Distribute the configuration to the other nodes.
1062

1063
    Currently, this only copies the configuration file. In the future,
1064
    it could be used to encapsulate the 2/3-phase update mechanism.
1065

1066
    """
1067
    if self._offline:
1068
      return True
1069
    bad = False
1070

    
1071
    node_list = []
1072
    addr_list = []
1073
    myhostname = self._my_hostname
1074
    # we can skip checking whether _UnlockedGetNodeInfo returns None
1075
    # since the node list comes from _UnlocketGetNodeList, and we are
1076
    # called with the lock held, so no modifications should take place
1077
    # in between
1078
    for node_name in self._UnlockedGetNodeList():
1079
      if node_name == myhostname:
1080
        continue
1081
      node_info = self._UnlockedGetNodeInfo(node_name)
1082
      if not node_info.master_candidate:
1083
        continue
1084
      node_list.append(node_info.name)
1085
      addr_list.append(node_info.primary_ip)
1086

    
1087
    result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1088
                                            address_list=addr_list)
1089
    for to_node, to_result in result.items():
1090
      msg = to_result.RemoteFailMsg()
1091
      if msg:
1092
        msg = ("Copy of file %s to node %s failed: %s" %
1093
               (self._cfg_file, to_node, msg))
1094
        logging.error(msg)
1095
        bad = True
1096
    return not bad
1097

    
1098
  def _WriteConfig(self, destination=None):
1099
    """Write the configuration data to persistent storage.
1100

1101
    """
1102
    config_errors = self._UnlockedVerifyConfig()
1103
    if config_errors:
1104
      raise errors.ConfigurationError("Configuration data is not"
1105
                                      " consistent: %s" %
1106
                                      (", ".join(config_errors)))
1107
    if destination is None:
1108
      destination = self._cfg_file
1109
    self._BumpSerialNo()
1110
    txt = serializer.Dump(self._config_data.ToDict())
1111
    dir_name, file_name = os.path.split(destination)
1112
    fd, name = tempfile.mkstemp('.newconfig', file_name, dir_name)
1113
    f = os.fdopen(fd, 'w')
1114
    try:
1115
      f.write(txt)
1116
      os.fsync(f.fileno())
1117
    finally:
1118
      f.close()
1119
    # we don't need to do os.close(fd) as f.close() did it
1120
    os.rename(name, destination)
1121
    self.write_count += 1
1122

    
1123
    # and redistribute the config file to master candidates
1124
    self._DistributeConfig()
1125

    
1126
    # Write ssconf files on all nodes (including locally)
1127
    if self._last_cluster_serial < self._config_data.cluster.serial_no:
1128
      if not self._offline:
1129
        result = rpc.RpcRunner.call_write_ssconf_files(\
1130
          self._UnlockedGetNodeList(),
1131
          self._UnlockedGetSsconfValues())
1132
        for nname, nresu in result.items():
1133
          msg = nresu.RemoteFailMsg()
1134
          if msg:
1135
            logging.warning("Error while uploading ssconf files to"
1136
                            " node %s: %s", nname, msg)
1137
      self._last_cluster_serial = self._config_data.cluster.serial_no
1138

    
1139
  def _UnlockedGetSsconfValues(self):
1140
    """Return the values needed by ssconf.
1141

1142
    @rtype: dict
1143
    @return: a dictionary with keys the ssconf names and values their
1144
        associated value
1145

1146
    """
1147
    fn = "\n".join
1148
    instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1149
    node_names = utils.NiceSort(self._UnlockedGetNodeList())
1150
    node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1151
    node_pri_ips = [ninfo.primary_ip for ninfo in node_info]
1152
    node_snd_ips = [ninfo.secondary_ip for ninfo in node_info]
1153

    
1154
    instance_data = fn(instance_names)
1155
    off_data = fn(node.name for node in node_info if node.offline)
1156
    on_data = fn(node.name for node in node_info if not node.offline)
1157
    mc_data = fn(node.name for node in node_info if node.master_candidate)
1158
    node_data = fn(node_names)
1159
    node_pri_ips_data = fn(node_pri_ips)
1160
    node_snd_ips_data = fn(node_snd_ips)
1161

    
1162
    cluster = self._config_data.cluster
1163
    cluster_tags = fn(cluster.GetTags())
1164
    return {
1165
      constants.SS_CLUSTER_NAME: cluster.cluster_name,
1166
      constants.SS_CLUSTER_TAGS: cluster_tags,
1167
      constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1168
      constants.SS_MASTER_CANDIDATES: mc_data,
1169
      constants.SS_MASTER_IP: cluster.master_ip,
1170
      constants.SS_MASTER_NETDEV: cluster.master_netdev,
1171
      constants.SS_MASTER_NODE: cluster.master_node,
1172
      constants.SS_NODE_LIST: node_data,
1173
      constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1174
      constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1175
      constants.SS_OFFLINE_NODES: off_data,
1176
      constants.SS_ONLINE_NODES: on_data,
1177
      constants.SS_INSTANCE_LIST: instance_data,
1178
      constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1179
      }
1180

    
1181
  @locking.ssynchronized(_config_lock, shared=1)
1182
  def GetVGName(self):
1183
    """Return the volume group name.
1184

1185
    """
1186
    return self._config_data.cluster.volume_group_name
1187

    
1188
  @locking.ssynchronized(_config_lock)
1189
  def SetVGName(self, vg_name):
1190
    """Set the volume group name.
1191

1192
    """
1193
    self._config_data.cluster.volume_group_name = vg_name
1194
    self._config_data.cluster.serial_no += 1
1195
    self._WriteConfig()
1196

    
1197
  @locking.ssynchronized(_config_lock, shared=1)
1198
  def GetMACPrefix(self):
1199
    """Return the mac prefix.
1200

1201
    """
1202
    return self._config_data.cluster.mac_prefix
1203

    
1204
  @locking.ssynchronized(_config_lock, shared=1)
1205
  def GetClusterInfo(self):
1206
    """Returns information about the cluster
1207

1208
    @rtype: L{objects.Cluster}
1209
    @return: the cluster object
1210

1211
    """
1212
    return self._config_data.cluster
1213

    
1214
  @locking.ssynchronized(_config_lock)
1215
  def Update(self, target):
1216
    """Notify function to be called after updates.
1217

1218
    This function must be called when an object (as returned by
1219
    GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1220
    caller wants the modifications saved to the backing store. Note
1221
    that all modified objects will be saved, but the target argument
1222
    is the one the caller wants to ensure that it's saved.
1223

1224
    @param target: an instance of either L{objects.Cluster},
1225
        L{objects.Node} or L{objects.Instance} which is existing in
1226
        the cluster
1227

1228
    """
1229
    if self._config_data is None:
1230
      raise errors.ProgrammerError("Configuration file not read,"
1231
                                   " cannot save.")
1232
    update_serial = False
1233
    if isinstance(target, objects.Cluster):
1234
      test = target == self._config_data.cluster
1235
    elif isinstance(target, objects.Node):
1236
      test = target in self._config_data.nodes.values()
1237
      update_serial = True
1238
    elif isinstance(target, objects.Instance):
1239
      test = target in self._config_data.instances.values()
1240
    else:
1241
      raise errors.ProgrammerError("Invalid object type (%s) passed to"
1242
                                   " ConfigWriter.Update" % type(target))
1243
    if not test:
1244
      raise errors.ConfigurationError("Configuration updated since object"
1245
                                      " has been read or unknown object")
1246
    target.serial_no += 1
1247

    
1248
    if update_serial:
1249
      # for node updates, we need to increase the cluster serial too
1250
      self._config_data.cluster.serial_no += 1
1251

    
1252
    if isinstance(target, objects.Instance):
1253
      self._UnlockedReleaseDRBDMinors(target.name)
1254
      for nic in target.nics:
1255
        self._temporary_macs.discard(nic.mac)
1256

    
1257
    self._WriteConfig()