Statistics
| Branch: | Tag: | Revision:

root / lib / config.py @ d48663e4

History | View | Annotate | Download (29.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Configuration management for Ganeti
23

24
This module provides the interface to the Ganeti cluster configuration.
25

26
The configuration data is stored on every node but is updated on the master
27
only. After each update, the master distributes the data to the other nodes.
28

29
Currently, the data storage format is JSON. YAML was slow and consuming too
30
much memory.
31

32
"""
33

    
34
import os
35
import tempfile
36
import random
37
import logging
38

    
39
from ganeti import errors
40
from ganeti import locking
41
from ganeti import utils
42
from ganeti import constants
43
from ganeti import rpc
44
from ganeti import objects
45
from ganeti import serializer
46
from ganeti import ssconf
47

    
48

    
49
_config_lock = locking.SharedLock()
50

    
51

    
52
def ValidateConfig():
53
  sstore = ssconf.SimpleStore()
54

    
55
  if sstore.GetConfigVersion() != constants.CONFIG_VERSION:
56
    raise errors.ConfigurationError("Cluster configuration version"
57
                                    " mismatch, got %s instead of %s" %
58
                                    (sstore.GetConfigVersion(),
59
                                     constants.CONFIG_VERSION))
60

    
61

    
62
class ConfigWriter:
63
  """The interface to the cluster configuration.
64

65
  """
66
  def __init__(self, cfg_file=None, offline=False):
67
    self.write_count = 0
68
    self._lock = _config_lock
69
    self._config_data = None
70
    self._config_time = None
71
    self._config_size = None
72
    self._config_inode = None
73
    self._offline = offline
74
    if cfg_file is None:
75
      self._cfg_file = constants.CLUSTER_CONF_FILE
76
    else:
77
      self._cfg_file = cfg_file
78
    self._temporary_ids = set()
79
    self._temporary_drbds = {}
80
    # Note: in order to prevent errors when resolving our name in
81
    # _DistributeConfig, we compute it here once and reuse it; it's
82
    # better to raise an error before starting to modify the config
83
    # file than after it was modified
84
    self._my_hostname = utils.HostInfo().name
85

    
86
  # this method needs to be static, so that we can call it on the class
87
  @staticmethod
88
  def IsCluster():
89
    """Check if the cluster is configured.
90

91
    """
92
    return os.path.exists(constants.CLUSTER_CONF_FILE)
93

    
94
  @locking.ssynchronized(_config_lock, shared=1)
95
  def GenerateMAC(self):
96
    """Generate a MAC for an instance.
97

98
    This should check the current instances for duplicates.
99

100
    """
101
    self._OpenConfig()
102
    prefix = self._config_data.cluster.mac_prefix
103
    all_macs = self._AllMACs()
104
    retries = 64
105
    while retries > 0:
106
      byte1 = random.randrange(0, 256)
107
      byte2 = random.randrange(0, 256)
108
      byte3 = random.randrange(0, 256)
109
      mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
110
      if mac not in all_macs:
111
        break
112
      retries -= 1
113
    else:
114
      raise errors.ConfigurationError("Can't generate unique MAC")
115
    return mac
116

    
117
  @locking.ssynchronized(_config_lock, shared=1)
118
  def IsMacInUse(self, mac):
119
    """Predicate: check if the specified MAC is in use in the Ganeti cluster.
120

121
    This only checks instances managed by this cluster, it does not
122
    check for potential collisions elsewhere.
123

124
    """
125
    self._OpenConfig()
126
    all_macs = self._AllMACs()
127
    return mac in all_macs
128

    
129
  def _ComputeAllLVs(self):
130
    """Compute the list of all LVs.
131

132
    """
133
    self._OpenConfig()
134
    lvnames = set()
135
    for instance in self._config_data.instances.values():
136
      node_data = instance.MapLVsByNode()
137
      for lv_list in node_data.values():
138
        lvnames.update(lv_list)
139
    return lvnames
140

    
141
  @locking.ssynchronized(_config_lock, shared=1)
142
  def GenerateUniqueID(self, exceptions=None):
143
    """Generate an unique disk name.
144

145
    This checks the current node, instances and disk names for
146
    duplicates.
147

148
    Args:
149
      - exceptions: a list with some other names which should be checked
150
                    for uniqueness (used for example when you want to get
151
                    more than one id at one time without adding each one in
152
                    turn to the config file
153

154
    Returns: the unique id as a string
155

156
    """
157
    existing = set()
158
    existing.update(self._temporary_ids)
159
    existing.update(self._ComputeAllLVs())
160
    existing.update(self._config_data.instances.keys())
161
    existing.update(self._config_data.nodes.keys())
162
    if exceptions is not None:
163
      existing.update(exceptions)
164
    retries = 64
165
    while retries > 0:
166
      unique_id = utils.NewUUID()
167
      if unique_id not in existing and unique_id is not None:
168
        break
169
    else:
170
      raise errors.ConfigurationError("Not able generate an unique ID"
171
                                      " (last tried ID: %s" % unique_id)
172
    self._temporary_ids.add(unique_id)
173
    return unique_id
174

    
175
  def _AllMACs(self):
176
    """Return all MACs present in the config.
177

178
    """
179
    self._OpenConfig()
180

    
181
    result = []
182
    for instance in self._config_data.instances.values():
183
      for nic in instance.nics:
184
        result.append(nic.mac)
185

    
186
    return result
187

    
188
  @locking.ssynchronized(_config_lock, shared=1)
189
  def VerifyConfig(self):
190
    """Stub verify function.
191
    """
192
    self._OpenConfig()
193

    
194
    result = []
195
    seen_macs = []
196
    ports = {}
197
    data = self._config_data
198
    for instance_name in data.instances:
199
      instance = data.instances[instance_name]
200
      if instance.primary_node not in data.nodes:
201
        result.append("instance '%s' has invalid primary node '%s'" %
202
                      (instance_name, instance.primary_node))
203
      for snode in instance.secondary_nodes:
204
        if snode not in data.nodes:
205
          result.append("instance '%s' has invalid secondary node '%s'" %
206
                        (instance_name, snode))
207
      for idx, nic in enumerate(instance.nics):
208
        if nic.mac in seen_macs:
209
          result.append("instance '%s' has NIC %d mac %s duplicate" %
210
                        (instance_name, idx, nic.mac))
211
        else:
212
          seen_macs.append(nic.mac)
213

    
214
      # gather the drbd ports for duplicate checks
215
      for dsk in instance.disks:
216
        if dsk.dev_type in constants.LDS_DRBD:
217
          tcp_port = dsk.logical_id[2]
218
          if tcp_port not in ports:
219
            ports[tcp_port] = []
220
          ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
221
      # gather network port reservation
222
      net_port = getattr(instance, "network_port", None)
223
      if net_port is not None:
224
        if net_port not in ports:
225
          ports[net_port] = []
226
        ports[net_port].append((instance.name, "network port"))
227

    
228
    # cluster-wide pool of free ports
229
    for free_port in self._config_data.cluster.tcpudp_port_pool:
230
      if free_port not in ports:
231
        ports[free_port] = []
232
      ports[free_port].append(("cluster", "port marked as free"))
233

    
234
    # compute tcp/udp duplicate ports
235
    keys = ports.keys()
236
    keys.sort()
237
    for pnum in keys:
238
      pdata = ports[pnum]
239
      if len(pdata) > 1:
240
        txt = ", ".join(["%s/%s" % val for val in pdata])
241
        result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
242

    
243
    # highest used tcp port check
244
    if keys:
245
      if keys[-1] > self._config_data.cluster.highest_used_port:
246
        result.append("Highest used port mismatch, saved %s, computed %s" %
247
                      (self._config_data.cluster.highest_used_port,
248
                       keys[-1]))
249

    
250
    return result
251

    
252
  def _UnlockedSetDiskID(self, disk, node_name):
253
    """Convert the unique ID to the ID needed on the target nodes.
254

255
    This is used only for drbd, which needs ip/port configuration.
256

257
    The routine descends down and updates its children also, because
258
    this helps when the only the top device is passed to the remote
259
    node.
260

261
    This function is for internal use, when the config lock is already held.
262

263
    """
264
    if disk.children:
265
      for child in disk.children:
266
        self._UnlockedSetDiskID(child, node_name)
267

    
268
    if disk.logical_id is None and disk.physical_id is not None:
269
      return
270
    if disk.dev_type == constants.LD_DRBD8:
271
      pnode, snode, port, pminor, sminor = disk.logical_id
272
      if node_name not in (pnode, snode):
273
        raise errors.ConfigurationError("DRBD device not knowing node %s" %
274
                                        node_name)
275
      pnode_info = self._UnlockedGetNodeInfo(pnode)
276
      snode_info = self._UnlockedGetNodeInfo(snode)
277
      if pnode_info is None or snode_info is None:
278
        raise errors.ConfigurationError("Can't find primary or secondary node"
279
                                        " for %s" % str(disk))
280
      p_data = (pnode_info.secondary_ip, port)
281
      s_data = (snode_info.secondary_ip, port)
282
      if pnode == node_name:
283
        disk.physical_id = p_data + s_data + (pminor,)
284
      else: # it must be secondary, we tested above
285
        disk.physical_id = s_data + p_data + (sminor,)
286
    else:
287
      disk.physical_id = disk.logical_id
288
    return
289

    
290
  @locking.ssynchronized(_config_lock)
291
  def SetDiskID(self, disk, node_name):
292
    """Convert the unique ID to the ID needed on the target nodes.
293

294
    This is used only for drbd, which needs ip/port configuration.
295

296
    The routine descends down and updates its children also, because
297
    this helps when the only the top device is passed to the remote
298
    node.
299

300
    """
301
    return self._UnlockedSetDiskID(disk, node_name)
302

    
303
  @locking.ssynchronized(_config_lock)
304
  def AddTcpUdpPort(self, port):
305
    """Adds a new port to the available port pool.
306

307
    """
308
    if not isinstance(port, int):
309
      raise errors.ProgrammerError("Invalid type passed for port")
310

    
311
    self._OpenConfig()
312
    self._config_data.cluster.tcpudp_port_pool.add(port)
313
    self._WriteConfig()
314

    
315
  @locking.ssynchronized(_config_lock, shared=1)
316
  def GetPortList(self):
317
    """Returns a copy of the current port list.
318

319
    """
320
    self._OpenConfig()
321
    return self._config_data.cluster.tcpudp_port_pool.copy()
322

    
323
  @locking.ssynchronized(_config_lock)
324
  def AllocatePort(self):
325
    """Allocate a port.
326

327
    The port will be taken from the available port pool or from the
328
    default port range (and in this case we increase
329
    highest_used_port).
330

331
    """
332
    self._OpenConfig()
333

    
334
    # If there are TCP/IP ports configured, we use them first.
335
    if self._config_data.cluster.tcpudp_port_pool:
336
      port = self._config_data.cluster.tcpudp_port_pool.pop()
337
    else:
338
      port = self._config_data.cluster.highest_used_port + 1
339
      if port >= constants.LAST_DRBD_PORT:
340
        raise errors.ConfigurationError("The highest used port is greater"
341
                                        " than %s. Aborting." %
342
                                        constants.LAST_DRBD_PORT)
343
      self._config_data.cluster.highest_used_port = port
344

    
345
    self._WriteConfig()
346
    return port
347

    
348
  def _ComputeDRBDMap(self, instance):
349
    """Compute the used DRBD minor/nodes.
350

351
    Return: dictionary of node_name: dict of minor: instance_name. The
352
    returned dict will have all the nodes in it (even if with an empty
353
    list).
354

355
    """
356
    def _AppendUsedPorts(instance_name, disk, used):
357
      if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) == 5:
358
        nodeA, nodeB, dummy, minorA, minorB = disk.logical_id
359
        for node, port in ((nodeA, minorA), (nodeB, minorB)):
360
          assert node in used, "Instance node not found in node list"
361
          if port in used[node]:
362
            raise errors.ProgrammerError("DRBD minor already used:"
363
                                         " %s/%s, %s/%s" %
364
                                         (node, port, instance_name,
365
                                          used[node][port]))
366

    
367
          used[node][port] = instance_name
368
      if disk.children:
369
        for child in disk.children:
370
          _AppendUsedPorts(instance_name, child, used)
371

    
372
    my_dict = dict((node, {}) for node in self._config_data.nodes)
373
    for (node, minor), instance in self._temporary_drbds.iteritems():
374
      my_dict[node][minor] = instance
375
    for instance in self._config_data.instances.itervalues():
376
      for disk in instance.disks:
377
        _AppendUsedPorts(instance.name, disk, my_dict)
378
    return my_dict
379

    
380
  @locking.ssynchronized(_config_lock)
381
  def AllocateDRBDMinor(self, nodes, instance):
382
    """Allocate a drbd minor.
383

384
    The free minor will be automatically computed from the existing
385
    devices. A node can be given multiple times in order to allocate
386
    multiple minors. The result is the list of minors, in the same
387
    order as the passed nodes.
388

389
    """
390
    self._OpenConfig()
391

    
392
    d_map = self._ComputeDRBDMap(instance)
393
    result = []
394
    for nname in nodes:
395
      ndata = d_map[nname]
396
      if not ndata:
397
        # no minors used, we can start at 0
398
        result.append(0)
399
        ndata[0] = instance
400
        self._temporary_drbds[(nname, 0)] = instance
401
        continue
402
      keys = ndata.keys()
403
      keys.sort()
404
      ffree = utils.FirstFree(keys)
405
      if ffree is None:
406
        # return the next minor
407
        # TODO: implement high-limit check
408
        minor = keys[-1] + 1
409
      else:
410
        minor = ffree
411
      result.append(minor)
412
      ndata[minor] = instance
413
      assert (nname, minor) not in self._temporary_drbds, \
414
             "Attempt to reuse reserved DRBD minor"
415
      self._temporary_drbds[(nname, minor)] = instance
416
    logging.debug("Request to allocate drbd minors, input: %s, returning %s",
417
                  nodes, result)
418
    return result
419

    
420
  @locking.ssynchronized(_config_lock)
421
  def ReleaseDRBDMinors(self, instance):
422
    """Release temporary drbd minors allocated for a given instance.
423

424
    This should be called on both the error paths and on the success
425
    paths (after the instance has been added or updated).
426

427
    @type instance: string
428
    @param instance: the instance for which temporary minors should be
429
                     released
430

431
    """
432
    for key, name in self._temporary_drbds.items():
433
      if name == instance:
434
        del self._temporary_drbds[key]
435

    
436
  @locking.ssynchronized(_config_lock, shared=1)
437
  def GetHostKey(self):
438
    """Return the rsa hostkey from the config.
439

440
    Args: None
441

442
    Returns: rsa hostkey
443
    """
444
    self._OpenConfig()
445
    return self._config_data.cluster.rsahostkeypub
446

    
447
  @locking.ssynchronized(_config_lock)
448
  def AddInstance(self, instance):
449
    """Add an instance to the config.
450

451
    This should be used after creating a new instance.
452

453
    Args:
454
      instance: the instance object
455
    """
456
    if not isinstance(instance, objects.Instance):
457
      raise errors.ProgrammerError("Invalid type passed to AddInstance")
458

    
459
    if instance.disk_template != constants.DT_DISKLESS:
460
      all_lvs = instance.MapLVsByNode()
461
      logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
462

    
463
    self._OpenConfig()
464
    instance.serial_no = 1
465
    self._config_data.instances[instance.name] = instance
466
    self._config_data.cluster.serial_no += 1
467
    self._WriteConfig()
468

    
469
  def _SetInstanceStatus(self, instance_name, status):
470
    """Set the instance's status to a given value.
471

472
    """
473
    if status not in ("up", "down"):
474
      raise errors.ProgrammerError("Invalid status '%s' passed to"
475
                                   " ConfigWriter._SetInstanceStatus()" %
476
                                   status)
477
    self._OpenConfig()
478

    
479
    if instance_name not in self._config_data.instances:
480
      raise errors.ConfigurationError("Unknown instance '%s'" %
481
                                      instance_name)
482
    instance = self._config_data.instances[instance_name]
483
    if instance.status != status:
484
      instance.status = status
485
      instance.serial_no += 1
486
      self._WriteConfig()
487

    
488
  @locking.ssynchronized(_config_lock)
489
  def MarkInstanceUp(self, instance_name):
490
    """Mark the instance status to up in the config.
491

492
    """
493
    self._SetInstanceStatus(instance_name, "up")
494

    
495
  @locking.ssynchronized(_config_lock)
496
  def RemoveInstance(self, instance_name):
497
    """Remove the instance from the configuration.
498

499
    """
500
    self._OpenConfig()
501

    
502
    if instance_name not in self._config_data.instances:
503
      raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
504
    del self._config_data.instances[instance_name]
505
    self._config_data.cluster.serial_no += 1
506
    self._WriteConfig()
507

    
508
  @locking.ssynchronized(_config_lock)
509
  def RenameInstance(self, old_name, new_name):
510
    """Rename an instance.
511

512
    This needs to be done in ConfigWriter and not by RemoveInstance
513
    combined with AddInstance as only we can guarantee an atomic
514
    rename.
515

516
    """
517
    self._OpenConfig()
518
    if old_name not in self._config_data.instances:
519
      raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
520
    inst = self._config_data.instances[old_name]
521
    del self._config_data.instances[old_name]
522
    inst.name = new_name
523

    
524
    for disk in inst.disks:
525
      if disk.dev_type == constants.LD_FILE:
526
        # rename the file paths in logical and physical id
527
        file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
528
        disk.physical_id = disk.logical_id = (disk.logical_id[0],
529
                                              os.path.join(file_storage_dir,
530
                                                           inst.name,
531
                                                           disk.iv_name))
532

    
533
    self._config_data.instances[inst.name] = inst
534
    self._config_data.cluster.serial_no += 1
535
    self._WriteConfig()
536

    
537
  @locking.ssynchronized(_config_lock)
538
  def MarkInstanceDown(self, instance_name):
539
    """Mark the status of an instance to down in the configuration.
540

541
    """
542
    self._SetInstanceStatus(instance_name, "down")
543

    
544
  def _UnlockedGetInstanceList(self):
545
    """Get the list of instances.
546

547
    This function is for internal use, when the config lock is already held.
548

549
    """
550
    self._OpenConfig()
551
    return self._config_data.instances.keys()
552

    
553
  @locking.ssynchronized(_config_lock, shared=1)
554
  def GetInstanceList(self):
555
    """Get the list of instances.
556

557
    Returns:
558
      array of instances, ex. ['instance2.example.com','instance1.example.com']
559
      these contains all the instances, also the ones in Admin_down state
560

561
    """
562
    return self._UnlockedGetInstanceList()
563

    
564
  @locking.ssynchronized(_config_lock, shared=1)
565
  def ExpandInstanceName(self, short_name):
566
    """Attempt to expand an incomplete instance name.
567

568
    """
569
    self._OpenConfig()
570

    
571
    return utils.MatchNameComponent(short_name,
572
                                    self._config_data.instances.keys())
573

    
574
  def _UnlockedGetInstanceInfo(self, instance_name):
575
    """Returns informations about an instance.
576

577
    This function is for internal use, when the config lock is already held.
578

579
    """
580
    self._OpenConfig()
581

    
582
    if instance_name not in self._config_data.instances:
583
      return None
584

    
585
    return self._config_data.instances[instance_name]
586

    
587
  @locking.ssynchronized(_config_lock, shared=1)
588
  def GetInstanceInfo(self, instance_name):
589
    """Returns informations about an instance.
590

591
    It takes the information from the configuration file. Other informations of
592
    an instance are taken from the live systems.
593

594
    Args:
595
      instance: name of the instance, ex instance1.example.com
596

597
    Returns:
598
      the instance object
599

600
    """
601
    return self._UnlockedGetInstanceInfo(instance_name)
602

    
603
  @locking.ssynchronized(_config_lock, shared=1)
604
  def GetAllInstancesInfo(self):
605
    """Get the configuration of all instances.
606

607
    @rtype: dict
608
    @returns: dict of (instance, instance_info), where instance_info is what
609
              would GetInstanceInfo return for the node
610

611
    """
612
    my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
613
                    for instance in self._UnlockedGetInstanceList()])
614
    return my_dict
615

    
616
  @locking.ssynchronized(_config_lock)
617
  def AddNode(self, node):
618
    """Add a node to the configuration.
619

620
    Args:
621
      node: an object.Node instance
622

623
    """
624
    logging.info("Adding node %s to configuration" % node.name)
625

    
626
    self._OpenConfig()
627
    node.serial_no = 1
628
    self._config_data.nodes[node.name] = node
629
    self._config_data.cluster.serial_no += 1
630
    self._WriteConfig()
631

    
632
  @locking.ssynchronized(_config_lock)
633
  def RemoveNode(self, node_name):
634
    """Remove a node from the configuration.
635

636
    """
637
    logging.info("Removing node %s from configuration" % node_name)
638

    
639
    self._OpenConfig()
640
    if node_name not in self._config_data.nodes:
641
      raise errors.ConfigurationError("Unknown node '%s'" % node_name)
642

    
643
    del self._config_data.nodes[node_name]
644
    self._config_data.cluster.serial_no += 1
645
    self._WriteConfig()
646

    
647
  @locking.ssynchronized(_config_lock, shared=1)
648
  def ExpandNodeName(self, short_name):
649
    """Attempt to expand an incomplete instance name.
650

651
    """
652
    self._OpenConfig()
653

    
654
    return utils.MatchNameComponent(short_name,
655
                                    self._config_data.nodes.keys())
656

    
657
  def _UnlockedGetNodeInfo(self, node_name):
658
    """Get the configuration of a node, as stored in the config.
659

660
    This function is for internal use, when the config lock is already held.
661

662
    Args: node: nodename (tuple) of the node
663

664
    Returns: the node object
665

666
    """
667
    self._OpenConfig()
668

    
669
    if node_name not in self._config_data.nodes:
670
      return None
671

    
672
    return self._config_data.nodes[node_name]
673

    
674

    
675
  @locking.ssynchronized(_config_lock, shared=1)
676
  def GetNodeInfo(self, node_name):
677
    """Get the configuration of a node, as stored in the config.
678

679
    Args: node: nodename (tuple) of the node
680

681
    Returns: the node object
682

683
    """
684
    return self._UnlockedGetNodeInfo(node_name)
685

    
686
  def _UnlockedGetNodeList(self):
687
    """Return the list of nodes which are in the configuration.
688

689
    This function is for internal use, when the config lock is already held.
690

691
    """
692
    self._OpenConfig()
693
    return self._config_data.nodes.keys()
694

    
695

    
696
  @locking.ssynchronized(_config_lock, shared=1)
697
  def GetNodeList(self):
698
    """Return the list of nodes which are in the configuration.
699

700
    """
701
    return self._UnlockedGetNodeList()
702

    
703
  @locking.ssynchronized(_config_lock, shared=1)
704
  def GetAllNodesInfo(self):
705
    """Get the configuration of all nodes.
706

707
    @rtype: dict
708
    @returns: dict of (node, node_info), where node_info is what
709
              would GetNodeInfo return for the node
710

711
    """
712
    my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
713
                    for node in self._UnlockedGetNodeList()])
714
    return my_dict
715

    
716
  @locking.ssynchronized(_config_lock, shared=1)
717
  def DumpConfig(self):
718
    """Return the entire configuration of the cluster.
719
    """
720
    self._OpenConfig()
721
    return self._config_data
722

    
723
  def _BumpSerialNo(self):
724
    """Bump up the serial number of the config.
725

726
    """
727
    self._config_data.serial_no += 1
728

    
729
  def _OpenConfig(self):
730
    """Read the config data from disk.
731

732
    In case we already have configuration data and the config file has
733
    the same mtime as when we read it, we skip the parsing of the
734
    file, since de-serialisation could be slow.
735

736
    """
737
    try:
738
      st = os.stat(self._cfg_file)
739
    except OSError, err:
740
      raise errors.ConfigurationError("Can't stat config file: %s" % err)
741
    if (self._config_data is not None and
742
        self._config_time is not None and
743
        self._config_time == st.st_mtime and
744
        self._config_size == st.st_size and
745
        self._config_inode == st.st_ino):
746
      # data is current, so skip loading of config file
747
      return
748

    
749
    # Make sure the configuration has the right version
750
    ValidateConfig()
751

    
752
    f = open(self._cfg_file, 'r')
753
    try:
754
      try:
755
        data = objects.ConfigData.FromDict(serializer.Load(f.read()))
756
      except Exception, err:
757
        raise errors.ConfigurationError(err)
758
    finally:
759
      f.close()
760
    if (not hasattr(data, 'cluster') or
761
        not hasattr(data.cluster, 'rsahostkeypub')):
762
      raise errors.ConfigurationError("Incomplete configuration"
763
                                      " (missing cluster.rsahostkeypub)")
764
    self._config_data = data
765
    self._config_time = st.st_mtime
766
    self._config_size = st.st_size
767
    self._config_inode = st.st_ino
768

    
769
  def _DistributeConfig(self):
770
    """Distribute the configuration to the other nodes.
771

772
    Currently, this only copies the configuration file. In the future,
773
    it could be used to encapsulate the 2/3-phase update mechanism.
774

775
    """
776
    if self._offline:
777
      return True
778
    bad = False
779
    nodelist = self._UnlockedGetNodeList()
780
    myhostname = self._my_hostname
781

    
782
    try:
783
      nodelist.remove(myhostname)
784
    except ValueError:
785
      pass
786

    
787
    result = rpc.call_upload_file(nodelist, self._cfg_file)
788
    for node in nodelist:
789
      if not result[node]:
790
        logging.error("copy of file %s to node %s failed",
791
                      self._cfg_file, node)
792
        bad = True
793
    return not bad
794

    
795
  def _WriteConfig(self, destination=None):
796
    """Write the configuration data to persistent storage.
797

798
    """
799
    if destination is None:
800
      destination = self._cfg_file
801
    self._BumpSerialNo()
802
    txt = serializer.Dump(self._config_data.ToDict())
803
    dir_name, file_name = os.path.split(destination)
804
    fd, name = tempfile.mkstemp('.newconfig', file_name, dir_name)
805
    f = os.fdopen(fd, 'w')
806
    try:
807
      f.write(txt)
808
      os.fsync(f.fileno())
809
    finally:
810
      f.close()
811
    # we don't need to do os.close(fd) as f.close() did it
812
    os.rename(name, destination)
813
    self.write_count += 1
814
    # re-set our cache as not to re-read the config file
815
    try:
816
      st = os.stat(destination)
817
    except OSError, err:
818
      raise errors.ConfigurationError("Can't stat config file: %s" % err)
819
    self._config_time = st.st_mtime
820
    self._config_size = st.st_size
821
    self._config_inode = st.st_ino
822
    # and redistribute the config file
823
    self._DistributeConfig()
824

    
825
  @locking.ssynchronized(_config_lock)
826
  def InitConfig(self, node, primary_ip, secondary_ip,
827
                 hostkeypub, mac_prefix, vg_name, def_bridge):
828
    """Create the initial cluster configuration.
829

830
    It will contain the current node, which will also be the master
831
    node, and no instances or operating systmes.
832

833
    Args:
834
      node: the nodename of the initial node
835
      primary_ip: the IP address of the current host
836
      secondary_ip: the secondary IP of the current host or None
837
      hostkeypub: the public hostkey of this host
838

839
    """
840
    hu_port = constants.FIRST_DRBD_PORT - 1
841
    globalconfig = objects.Cluster(serial_no=1,
842
                                   rsahostkeypub=hostkeypub,
843
                                   highest_used_port=hu_port,
844
                                   mac_prefix=mac_prefix,
845
                                   volume_group_name=vg_name,
846
                                   default_bridge=def_bridge,
847
                                   tcpudp_port_pool=set())
848
    if secondary_ip is None:
849
      secondary_ip = primary_ip
850
    nodeconfig = objects.Node(name=node, primary_ip=primary_ip,
851
                              secondary_ip=secondary_ip, serial_no=1)
852

    
853
    self._config_data = objects.ConfigData(nodes={node: nodeconfig},
854
                                           instances={},
855
                                           cluster=globalconfig,
856
                                           serial_no=1)
857
    self._WriteConfig()
858

    
859
  @locking.ssynchronized(_config_lock, shared=1)
860
  def GetVGName(self):
861
    """Return the volume group name.
862

863
    """
864
    self._OpenConfig()
865
    return self._config_data.cluster.volume_group_name
866

    
867
  @locking.ssynchronized(_config_lock)
868
  def SetVGName(self, vg_name):
869
    """Set the volume group name.
870

871
    """
872
    self._OpenConfig()
873
    self._config_data.cluster.volume_group_name = vg_name
874
    self._config_data.cluster.serial_no += 1
875
    self._WriteConfig()
876

    
877
  @locking.ssynchronized(_config_lock, shared=1)
878
  def GetDefBridge(self):
879
    """Return the default bridge.
880

881
    """
882
    self._OpenConfig()
883
    return self._config_data.cluster.default_bridge
884

    
885
  @locking.ssynchronized(_config_lock, shared=1)
886
  def GetMACPrefix(self):
887
    """Return the mac prefix.
888

889
    """
890
    self._OpenConfig()
891
    return self._config_data.cluster.mac_prefix
892

    
893
  @locking.ssynchronized(_config_lock, shared=1)
894
  def GetClusterInfo(self):
895
    """Returns informations about the cluster
896

897
    Returns:
898
      the cluster object
899

900
    """
901
    self._OpenConfig()
902

    
903
    return self._config_data.cluster
904

    
905
  @locking.ssynchronized(_config_lock)
906
  def Update(self, target):
907
    """Notify function to be called after updates.
908

909
    This function must be called when an object (as returned by
910
    GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
911
    caller wants the modifications saved to the backing store. Note
912
    that all modified objects will be saved, but the target argument
913
    is the one the caller wants to ensure that it's saved.
914

915
    """
916
    if self._config_data is None:
917
      raise errors.ProgrammerError("Configuration file not read,"
918
                                   " cannot save.")
919
    if isinstance(target, objects.Cluster):
920
      test = target == self._config_data.cluster
921
    elif isinstance(target, objects.Node):
922
      test = target in self._config_data.nodes.values()
923
    elif isinstance(target, objects.Instance):
924
      test = target in self._config_data.instances.values()
925
    else:
926
      raise errors.ProgrammerError("Invalid object type (%s) passed to"
927
                                   " ConfigWriter.Update" % type(target))
928
    if not test:
929
      raise errors.ConfigurationError("Configuration updated since object"
930
                                      " has been read or unknown object")
931
    target.serial_no += 1
932

    
933
    self._WriteConfig()