Statistics
| Branch: | Tag: | Revision:

root / lib / config.py @ e7d81ba0

History | View | Annotate | Download (39.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Configuration management for Ganeti
23

24
This module provides the interface to the Ganeti cluster configuration.
25

26
The configuration data is stored on every node but is updated on the master
27
only. After each update, the master distributes the data to the other nodes.
28

29
Currently, the data storage format is JSON. YAML was slow and consuming too
30
much memory.
31

32
"""
33

    
34
import os
35
import tempfile
36
import random
37
import logging
38

    
39
from ganeti import errors
40
from ganeti import locking
41
from ganeti import utils
42
from ganeti import constants
43
from ganeti import rpc
44
from ganeti import objects
45
from ganeti import serializer
46

    
47

    
48
_config_lock = locking.SharedLock()
49

    
50

    
51
def _ValidateConfig(data):
52
  """Verifies that a configuration objects looks valid.
53

54
  This only verifies the version of the configuration.
55

56
  @raise errors.ConfigurationError: if the version differs from what
57
      we expect
58

59
  """
60
  if data.version != constants.CONFIG_VERSION:
61
    raise errors.ConfigurationError("Cluster configuration version"
62
                                    " mismatch, got %s instead of %s" %
63
                                    (data.version,
64
                                     constants.CONFIG_VERSION))
65

    
66

    
67
class ConfigWriter:
68
  """The interface to the cluster configuration.
69

70
  """
71
  def __init__(self, cfg_file=None, offline=False):
72
    self.write_count = 0
73
    self._lock = _config_lock
74
    self._config_data = None
75
    self._offline = offline
76
    if cfg_file is None:
77
      self._cfg_file = constants.CLUSTER_CONF_FILE
78
    else:
79
      self._cfg_file = cfg_file
80
    self._temporary_ids = set()
81
    self._temporary_drbds = {}
82
    self._temporary_macs = set()
83
    # Note: in order to prevent errors when resolving our name in
84
    # _DistributeConfig, we compute it here once and reuse it; it's
85
    # better to raise an error before starting to modify the config
86
    # file than after it was modified
87
    self._my_hostname = utils.HostInfo().name
88
    self._last_cluster_serial = -1
89
    self._OpenConfig()
90

    
91
  # this method needs to be static, so that we can call it on the class
92
  @staticmethod
93
  def IsCluster():
94
    """Check if the cluster is configured.
95

96
    """
97
    return os.path.exists(constants.CLUSTER_CONF_FILE)
98

    
99
  @locking.ssynchronized(_config_lock, shared=1)
100
  def GenerateMAC(self):
101
    """Generate a MAC for an instance.
102

103
    This should check the current instances for duplicates.
104

105
    """
106
    prefix = self._config_data.cluster.mac_prefix
107
    all_macs = self._AllMACs()
108
    retries = 64
109
    while retries > 0:
110
      byte1 = random.randrange(0, 256)
111
      byte2 = random.randrange(0, 256)
112
      byte3 = random.randrange(0, 256)
113
      mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
114
      if mac not in all_macs and mac not in self._temporary_macs:
115
        break
116
      retries -= 1
117
    else:
118
      raise errors.ConfigurationError("Can't generate unique MAC")
119
    self._temporary_macs.add(mac)
120
    return mac
121

    
122
  @locking.ssynchronized(_config_lock, shared=1)
123
  def IsMacInUse(self, mac):
124
    """Predicate: check if the specified MAC is in use in the Ganeti cluster.
125

126
    This only checks instances managed by this cluster, it does not
127
    check for potential collisions elsewhere.
128

129
    """
130
    all_macs = self._AllMACs()
131
    return mac in all_macs or mac in self._temporary_macs
132

    
133
  @locking.ssynchronized(_config_lock, shared=1)
134
  def GenerateDRBDSecret(self):
135
    """Generate a DRBD secret.
136

137
    This checks the current disks for duplicates.
138

139
    """
140
    all_secrets = self._AllDRBDSecrets()
141
    retries = 64
142
    while retries > 0:
143
      secret = utils.GenerateSecret()
144
      if secret not in all_secrets:
145
        break
146
      retries -= 1
147
    else:
148
      raise errors.ConfigurationError("Can't generate unique DRBD secret")
149
    return secret
150

    
151
  def _ComputeAllLVs(self):
152
    """Compute the list of all LVs.
153

154
    """
155
    lvnames = set()
156
    for instance in self._config_data.instances.values():
157
      node_data = instance.MapLVsByNode()
158
      for lv_list in node_data.values():
159
        lvnames.update(lv_list)
160
    return lvnames
161

    
162
  @locking.ssynchronized(_config_lock, shared=1)
163
  def GenerateUniqueID(self, exceptions=None):
164
    """Generate an unique disk name.
165

166
    This checks the current node, instances and disk names for
167
    duplicates.
168

169
    @param exceptions: a list with some other names which should be checked
170
        for uniqueness (used for example when you want to get
171
        more than one id at one time without adding each one in
172
        turn to the config file)
173

174
    @rtype: string
175
    @return: the unique id
176

177
    """
178
    existing = set()
179
    existing.update(self._temporary_ids)
180
    existing.update(self._ComputeAllLVs())
181
    existing.update(self._config_data.instances.keys())
182
    existing.update(self._config_data.nodes.keys())
183
    if exceptions is not None:
184
      existing.update(exceptions)
185
    retries = 64
186
    while retries > 0:
187
      unique_id = utils.NewUUID()
188
      if unique_id not in existing and unique_id is not None:
189
        break
190
    else:
191
      raise errors.ConfigurationError("Not able generate an unique ID"
192
                                      " (last tried ID: %s" % unique_id)
193
    self._temporary_ids.add(unique_id)
194
    return unique_id
195

    
196
  def _AllMACs(self):
197
    """Return all MACs present in the config.
198

199
    @rtype: list
200
    @return: the list of all MACs
201

202
    """
203
    result = []
204
    for instance in self._config_data.instances.values():
205
      for nic in instance.nics:
206
        result.append(nic.mac)
207

    
208
    return result
209

    
210
  def _AllDRBDSecrets(self):
211
    """Return all DRBD secrets present in the config.
212

213
    @rtype: list
214
    @return: the list of all DRBD secrets
215

216
    """
217
    def helper(disk, result):
218
      """Recursively gather secrets from this disk."""
219
      if disk.dev_type == constants.DT_DRBD8:
220
        result.append(disk.logical_id[5])
221
      if disk.children:
222
        for child in disk.children:
223
          helper(child, result)
224

    
225
    result = []
226
    for instance in self._config_data.instances.values():
227
      for disk in instance.disks:
228
        helper(disk, result)
229

    
230
    return result
231

    
232
  def _CheckDiskIDs(self, disk, l_ids, p_ids):
233
    """Compute duplicate disk IDs
234

235
    @type disk: L{objects.Disk}
236
    @param disk: the disk at which to start searching
237
    @type l_ids: list
238
    @param l_ids: list of current logical ids
239
    @type p_ids: list
240
    @param p_ids: list of current physical ids
241
    @rtype: list
242
    @return: a list of error messages
243

244
    """
245
    result = []
246
    if disk.logical_id in l_ids:
247
      result.append("duplicate logical id %s" % str(disk.logical_id))
248
    else:
249
      l_ids.append(disk.logical_id)
250
    if disk.physical_id in p_ids:
251
      result.append("duplicate physical id %s" % str(disk.physical_id))
252
    else:
253
      p_ids.append(disk.physical_id)
254

    
255
    if disk.children:
256
      for child in disk.children:
257
        result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
258
    return result
259

    
260
  def _UnlockedVerifyConfig(self):
261
    """Verify function.
262

263
    @rtype: list
264
    @return: a list of error messages; a non-empty list signifies
265
        configuration errors
266

267
    """
268
    result = []
269
    seen_macs = []
270
    ports = {}
271
    data = self._config_data
272
    seen_lids = []
273
    seen_pids = []
274
    for instance_name in data.instances:
275
      instance = data.instances[instance_name]
276
      if instance.primary_node not in data.nodes:
277
        result.append("instance '%s' has invalid primary node '%s'" %
278
                      (instance_name, instance.primary_node))
279
      for snode in instance.secondary_nodes:
280
        if snode not in data.nodes:
281
          result.append("instance '%s' has invalid secondary node '%s'" %
282
                        (instance_name, snode))
283
      for idx, nic in enumerate(instance.nics):
284
        if nic.mac in seen_macs:
285
          result.append("instance '%s' has NIC %d mac %s duplicate" %
286
                        (instance_name, idx, nic.mac))
287
        else:
288
          seen_macs.append(nic.mac)
289

    
290
      # gather the drbd ports for duplicate checks
291
      for dsk in instance.disks:
292
        if dsk.dev_type in constants.LDS_DRBD:
293
          tcp_port = dsk.logical_id[2]
294
          if tcp_port not in ports:
295
            ports[tcp_port] = []
296
          ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
297
      # gather network port reservation
298
      net_port = getattr(instance, "network_port", None)
299
      if net_port is not None:
300
        if net_port not in ports:
301
          ports[net_port] = []
302
        ports[net_port].append((instance.name, "network port"))
303

    
304
      # instance disk verify
305
      for idx, disk in enumerate(instance.disks):
306
        result.extend(["instance '%s' disk %d error: %s" %
307
                       (instance.name, idx, msg) for msg in disk.Verify()])
308
        result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
309

    
310
    # cluster-wide pool of free ports
311
    for free_port in data.cluster.tcpudp_port_pool:
312
      if free_port not in ports:
313
        ports[free_port] = []
314
      ports[free_port].append(("cluster", "port marked as free"))
315

    
316
    # compute tcp/udp duplicate ports
317
    keys = ports.keys()
318
    keys.sort()
319
    for pnum in keys:
320
      pdata = ports[pnum]
321
      if len(pdata) > 1:
322
        txt = ", ".join(["%s/%s" % val for val in pdata])
323
        result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
324

    
325
    # highest used tcp port check
326
    if keys:
327
      if keys[-1] > data.cluster.highest_used_port:
328
        result.append("Highest used port mismatch, saved %s, computed %s" %
329
                      (data.cluster.highest_used_port, keys[-1]))
330

    
331
    if not data.nodes[data.cluster.master_node].master_candidate:
332
      result.append("Master node is not a master candidate")
333

    
334
    # master candidate checks
335
    mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
336
    if mc_now < mc_max:
337
      result.append("Not enough master candidates: actual %d, target %d" %
338
                    (mc_now, mc_max))
339

    
340
    # node checks
341
    for node in data.nodes.values():
342
      if [node.master_candidate, node.drained, node.offline].count(True) > 1:
343
        result.append("Node %s state is invalid: master_candidate=%s,"
344
                      " drain=%s, offline=%s" %
345
                      (node.name, node.master_candidate, node.drain,
346
                       node.offline))
347

    
348
    # drbd minors check
349
    d_map, duplicates = self._UnlockedComputeDRBDMap()
350
    for node, minor, instance_a, instance_b in duplicates:
351
      result.append("DRBD minor %d on node %s is assigned twice to instances"
352
                    " %s and %s" % (minor, node, instance_a, instance_b))
353

    
354
    return result
355

    
356
  @locking.ssynchronized(_config_lock, shared=1)
357
  def VerifyConfig(self):
358
    """Verify function.
359

360
    This is just a wrapper over L{_UnlockedVerifyConfig}.
361

362
    @rtype: list
363
    @return: a list of error messages; a non-empty list signifies
364
        configuration errors
365

366
    """
367
    return self._UnlockedVerifyConfig()
368

    
369
  def _UnlockedSetDiskID(self, disk, node_name):
370
    """Convert the unique ID to the ID needed on the target nodes.
371

372
    This is used only for drbd, which needs ip/port configuration.
373

374
    The routine descends down and updates its children also, because
375
    this helps when the only the top device is passed to the remote
376
    node.
377

378
    This function is for internal use, when the config lock is already held.
379

380
    """
381
    if disk.children:
382
      for child in disk.children:
383
        self._UnlockedSetDiskID(child, node_name)
384

    
385
    if disk.logical_id is None and disk.physical_id is not None:
386
      return
387
    if disk.dev_type == constants.LD_DRBD8:
388
      pnode, snode, port, pminor, sminor, secret = disk.logical_id
389
      if node_name not in (pnode, snode):
390
        raise errors.ConfigurationError("DRBD device not knowing node %s" %
391
                                        node_name)
392
      pnode_info = self._UnlockedGetNodeInfo(pnode)
393
      snode_info = self._UnlockedGetNodeInfo(snode)
394
      if pnode_info is None or snode_info is None:
395
        raise errors.ConfigurationError("Can't find primary or secondary node"
396
                                        " for %s" % str(disk))
397
      p_data = (pnode_info.secondary_ip, port)
398
      s_data = (snode_info.secondary_ip, port)
399
      if pnode == node_name:
400
        disk.physical_id = p_data + s_data + (pminor, secret)
401
      else: # it must be secondary, we tested above
402
        disk.physical_id = s_data + p_data + (sminor, secret)
403
    else:
404
      disk.physical_id = disk.logical_id
405
    return
406

    
407
  @locking.ssynchronized(_config_lock)
408
  def SetDiskID(self, disk, node_name):
409
    """Convert the unique ID to the ID needed on the target nodes.
410

411
    This is used only for drbd, which needs ip/port configuration.
412

413
    The routine descends down and updates its children also, because
414
    this helps when the only the top device is passed to the remote
415
    node.
416

417
    """
418
    return self._UnlockedSetDiskID(disk, node_name)
419

    
420
  @locking.ssynchronized(_config_lock)
421
  def AddTcpUdpPort(self, port):
422
    """Adds a new port to the available port pool.
423

424
    """
425
    if not isinstance(port, int):
426
      raise errors.ProgrammerError("Invalid type passed for port")
427

    
428
    self._config_data.cluster.tcpudp_port_pool.add(port)
429
    self._WriteConfig()
430

    
431
  @locking.ssynchronized(_config_lock, shared=1)
432
  def GetPortList(self):
433
    """Returns a copy of the current port list.
434

435
    """
436
    return self._config_data.cluster.tcpudp_port_pool.copy()
437

    
438
  @locking.ssynchronized(_config_lock)
439
  def AllocatePort(self):
440
    """Allocate a port.
441

442
    The port will be taken from the available port pool or from the
443
    default port range (and in this case we increase
444
    highest_used_port).
445

446
    """
447
    # If there are TCP/IP ports configured, we use them first.
448
    if self._config_data.cluster.tcpudp_port_pool:
449
      port = self._config_data.cluster.tcpudp_port_pool.pop()
450
    else:
451
      port = self._config_data.cluster.highest_used_port + 1
452
      if port >= constants.LAST_DRBD_PORT:
453
        raise errors.ConfigurationError("The highest used port is greater"
454
                                        " than %s. Aborting." %
455
                                        constants.LAST_DRBD_PORT)
456
      self._config_data.cluster.highest_used_port = port
457

    
458
    self._WriteConfig()
459
    return port
460

    
461
  def _UnlockedComputeDRBDMap(self):
462
    """Compute the used DRBD minor/nodes.
463

464
    @rtype: (dict, list)
465
    @return: dictionary of node_name: dict of minor: instance_name;
466
        the returned dict will have all the nodes in it (even if with
467
        an empty list), and a list of duplicates; if the duplicates
468
        list is not empty, the configuration is corrupted and its caller
469
        should raise an exception
470

471
    """
472
    def _AppendUsedPorts(instance_name, disk, used):
473
      duplicates = []
474
      if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
475
        nodeA, nodeB, dummy, minorA, minorB = disk.logical_id[:5]
476
        for node, port in ((nodeA, minorA), (nodeB, minorB)):
477
          assert node in used, ("Node '%s' of instance '%s' not found"
478
                                " in node list" % (node, instance_name))
479
          if port in used[node]:
480
            duplicates.append((node, port, instance_name, used[node][port]))
481
          else:
482
            used[node][port] = instance_name
483
      if disk.children:
484
        for child in disk.children:
485
          duplicates.extend(_AppendUsedPorts(instance_name, child, used))
486
      return duplicates
487

    
488
    duplicates = []
489
    my_dict = dict((node, {}) for node in self._config_data.nodes)
490
    for instance in self._config_data.instances.itervalues():
491
      for disk in instance.disks:
492
        duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
493
    for (node, minor), instance in self._temporary_drbds.iteritems():
494
      if minor in my_dict[node] and my_dict[node][minor] != instance:
495
        duplicates.append((node, minor, instance, my_dict[node][minor]))
496
      else:
497
        my_dict[node][minor] = instance
498
    return my_dict, duplicates
499

    
500
  @locking.ssynchronized(_config_lock)
501
  def ComputeDRBDMap(self):
502
    """Compute the used DRBD minor/nodes.
503

504
    This is just a wrapper over L{_UnlockedComputeDRBDMap}.
505

506
    @return: dictionary of node_name: dict of minor: instance_name;
507
        the returned dict will have all the nodes in it (even if with
508
        an empty list).
509

510
    """
511
    d_map, duplicates = self._UnlockedComputeDRBDMap()
512
    if duplicates:
513
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
514
                                      str(duplicates))
515
    return d_map
516

    
517
  @locking.ssynchronized(_config_lock)
518
  def AllocateDRBDMinor(self, nodes, instance):
519
    """Allocate a drbd minor.
520

521
    The free minor will be automatically computed from the existing
522
    devices. A node can be given multiple times in order to allocate
523
    multiple minors. The result is the list of minors, in the same
524
    order as the passed nodes.
525

526
    @type instance: string
527
    @param instance: the instance for which we allocate minors
528

529
    """
530
    assert isinstance(instance, basestring), \
531
           "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
532

    
533
    d_map, duplicates = self._UnlockedComputeDRBDMap()
534
    if duplicates:
535
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
536
                                      str(duplicates))
537
    result = []
538
    for nname in nodes:
539
      ndata = d_map[nname]
540
      if not ndata:
541
        # no minors used, we can start at 0
542
        result.append(0)
543
        ndata[0] = instance
544
        self._temporary_drbds[(nname, 0)] = instance
545
        continue
546
      keys = ndata.keys()
547
      keys.sort()
548
      ffree = utils.FirstFree(keys)
549
      if ffree is None:
550
        # return the next minor
551
        # TODO: implement high-limit check
552
        minor = keys[-1] + 1
553
      else:
554
        minor = ffree
555
      # double-check minor against current instances
556
      assert minor not in d_map[nname], \
557
             ("Attempt to reuse allocated DRBD minor %d on node %s,"
558
              " already allocated to instance %s" %
559
              (minor, nname, d_map[nname][minor]))
560
      ndata[minor] = instance
561
      # double-check minor against reservation
562
      r_key = (nname, minor)
563
      assert r_key not in self._temporary_drbds, \
564
             ("Attempt to reuse reserved DRBD minor %d on node %s,"
565
              " reserved for instance %s" %
566
              (minor, nname, self._temporary_drbds[r_key]))
567
      self._temporary_drbds[r_key] = instance
568
      result.append(minor)
569
    logging.debug("Request to allocate drbd minors, input: %s, returning %s",
570
                  nodes, result)
571
    return result
572

    
573
  def _UnlockedReleaseDRBDMinors(self, instance):
574
    """Release temporary drbd minors allocated for a given instance.
575

576
    @type instance: string
577
    @param instance: the instance for which temporary minors should be
578
                     released
579

580
    """
581
    assert isinstance(instance, basestring), \
582
           "Invalid argument passed to ReleaseDRBDMinors"
583
    for key, name in self._temporary_drbds.items():
584
      if name == instance:
585
        del self._temporary_drbds[key]
586

    
587
  @locking.ssynchronized(_config_lock)
588
  def ReleaseDRBDMinors(self, instance):
589
    """Release temporary drbd minors allocated for a given instance.
590

591
    This should be called on the error paths, on the success paths
592
    it's automatically called by the ConfigWriter add and update
593
    functions.
594

595
    This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
596

597
    @type instance: string
598
    @param instance: the instance for which temporary minors should be
599
                     released
600

601
    """
602
    self._UnlockedReleaseDRBDMinors(instance)
603

    
604
  @locking.ssynchronized(_config_lock, shared=1)
605
  def GetConfigVersion(self):
606
    """Get the configuration version.
607

608
    @return: Config version
609

610
    """
611
    return self._config_data.version
612

    
613
  @locking.ssynchronized(_config_lock, shared=1)
614
  def GetClusterName(self):
615
    """Get cluster name.
616

617
    @return: Cluster name
618

619
    """
620
    return self._config_data.cluster.cluster_name
621

    
622
  @locking.ssynchronized(_config_lock, shared=1)
623
  def GetMasterNode(self):
624
    """Get the hostname of the master node for this cluster.
625

626
    @return: Master hostname
627

628
    """
629
    return self._config_data.cluster.master_node
630

    
631
  @locking.ssynchronized(_config_lock, shared=1)
632
  def GetMasterIP(self):
633
    """Get the IP of the master node for this cluster.
634

635
    @return: Master IP
636

637
    """
638
    return self._config_data.cluster.master_ip
639

    
640
  @locking.ssynchronized(_config_lock, shared=1)
641
  def GetMasterNetdev(self):
642
    """Get the master network device for this cluster.
643

644
    """
645
    return self._config_data.cluster.master_netdev
646

    
647
  @locking.ssynchronized(_config_lock, shared=1)
648
  def GetFileStorageDir(self):
649
    """Get the file storage dir for this cluster.
650

651
    """
652
    return self._config_data.cluster.file_storage_dir
653

    
654
  @locking.ssynchronized(_config_lock, shared=1)
655
  def GetHypervisorType(self):
656
    """Get the hypervisor type for this cluster.
657

658
    """
659
    return self._config_data.cluster.default_hypervisor
660

    
661
  @locking.ssynchronized(_config_lock, shared=1)
662
  def GetHostKey(self):
663
    """Return the rsa hostkey from the config.
664

665
    @rtype: string
666
    @return: the rsa hostkey
667

668
    """
669
    return self._config_data.cluster.rsahostkeypub
670

    
671
  @locking.ssynchronized(_config_lock)
672
  def AddInstance(self, instance):
673
    """Add an instance to the config.
674

675
    This should be used after creating a new instance.
676

677
    @type instance: L{objects.Instance}
678
    @param instance: the instance object
679

680
    """
681
    if not isinstance(instance, objects.Instance):
682
      raise errors.ProgrammerError("Invalid type passed to AddInstance")
683

    
684
    if instance.disk_template != constants.DT_DISKLESS:
685
      all_lvs = instance.MapLVsByNode()
686
      logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
687

    
688
    instance.serial_no = 1
689
    self._config_data.instances[instance.name] = instance
690
    self._config_data.cluster.serial_no += 1
691
    self._UnlockedReleaseDRBDMinors(instance.name)
692
    for nic in instance.nics:
693
      self._temporary_macs.discard(nic.mac)
694
    self._WriteConfig()
695

    
696
  def _SetInstanceStatus(self, instance_name, status):
697
    """Set the instance's status to a given value.
698

699
    """
700
    assert isinstance(status, bool), \
701
           "Invalid status '%s' passed to SetInstanceStatus" % (status,)
702

    
703
    if instance_name not in self._config_data.instances:
704
      raise errors.ConfigurationError("Unknown instance '%s'" %
705
                                      instance_name)
706
    instance = self._config_data.instances[instance_name]
707
    if instance.admin_up != status:
708
      instance.admin_up = status
709
      instance.serial_no += 1
710
      self._WriteConfig()
711

    
712
  @locking.ssynchronized(_config_lock)
713
  def MarkInstanceUp(self, instance_name):
714
    """Mark the instance status to up in the config.
715

716
    """
717
    self._SetInstanceStatus(instance_name, True)
718

    
719
  @locking.ssynchronized(_config_lock)
720
  def RemoveInstance(self, instance_name):
721
    """Remove the instance from the configuration.
722

723
    """
724
    if instance_name not in self._config_data.instances:
725
      raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
726
    del self._config_data.instances[instance_name]
727
    self._config_data.cluster.serial_no += 1
728
    self._WriteConfig()
729

    
730
  @locking.ssynchronized(_config_lock)
731
  def RenameInstance(self, old_name, new_name):
732
    """Rename an instance.
733

734
    This needs to be done in ConfigWriter and not by RemoveInstance
735
    combined with AddInstance as only we can guarantee an atomic
736
    rename.
737

738
    """
739
    if old_name not in self._config_data.instances:
740
      raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
741
    inst = self._config_data.instances[old_name]
742
    del self._config_data.instances[old_name]
743
    inst.name = new_name
744

    
745
    for disk in inst.disks:
746
      if disk.dev_type == constants.LD_FILE:
747
        # rename the file paths in logical and physical id
748
        file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
749
        disk.physical_id = disk.logical_id = (disk.logical_id[0],
750
                                              os.path.join(file_storage_dir,
751
                                                           inst.name,
752
                                                           disk.iv_name))
753

    
754
    self._config_data.instances[inst.name] = inst
755
    self._WriteConfig()
756

    
757
  @locking.ssynchronized(_config_lock)
758
  def MarkInstanceDown(self, instance_name):
759
    """Mark the status of an instance to down in the configuration.
760

761
    """
762
    self._SetInstanceStatus(instance_name, False)
763

    
764
  def _UnlockedGetInstanceList(self):
765
    """Get the list of instances.
766

767
    This function is for internal use, when the config lock is already held.
768

769
    """
770
    return self._config_data.instances.keys()
771

    
772
  @locking.ssynchronized(_config_lock, shared=1)
773
  def GetInstanceList(self):
774
    """Get the list of instances.
775

776
    @return: array of instances, ex. ['instance2.example.com',
777
        'instance1.example.com']
778

779
    """
780
    return self._UnlockedGetInstanceList()
781

    
782
  @locking.ssynchronized(_config_lock, shared=1)
783
  def ExpandInstanceName(self, short_name):
784
    """Attempt to expand an incomplete instance name.
785

786
    """
787
    return utils.MatchNameComponent(short_name,
788
                                    self._config_data.instances.keys())
789

    
790
  def _UnlockedGetInstanceInfo(self, instance_name):
791
    """Returns informations about an instance.
792

793
    This function is for internal use, when the config lock is already held.
794

795
    """
796
    if instance_name not in self._config_data.instances:
797
      return None
798

    
799
    return self._config_data.instances[instance_name]
800

    
801
  @locking.ssynchronized(_config_lock, shared=1)
802
  def GetInstanceInfo(self, instance_name):
803
    """Returns informations about an instance.
804

805
    It takes the information from the configuration file. Other informations of
806
    an instance are taken from the live systems.
807

808
    @param instance_name: name of the instance, e.g.
809
        I{instance1.example.com}
810

811
    @rtype: L{objects.Instance}
812
    @return: the instance object
813

814
    """
815
    return self._UnlockedGetInstanceInfo(instance_name)
816

    
817
  @locking.ssynchronized(_config_lock, shared=1)
818
  def GetAllInstancesInfo(self):
819
    """Get the configuration of all instances.
820

821
    @rtype: dict
822
    @returns: dict of (instance, instance_info), where instance_info is what
823
              would GetInstanceInfo return for the node
824

825
    """
826
    my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
827
                    for instance in self._UnlockedGetInstanceList()])
828
    return my_dict
829

    
830
  @locking.ssynchronized(_config_lock)
831
  def AddNode(self, node):
832
    """Add a node to the configuration.
833

834
    @type node: L{objects.Node}
835
    @param node: a Node instance
836

837
    """
838
    logging.info("Adding node %s to configuration" % node.name)
839

    
840
    node.serial_no = 1
841
    self._config_data.nodes[node.name] = node
842
    self._config_data.cluster.serial_no += 1
843
    self._WriteConfig()
844

    
845
  @locking.ssynchronized(_config_lock)
846
  def RemoveNode(self, node_name):
847
    """Remove a node from the configuration.
848

849
    """
850
    logging.info("Removing node %s from configuration" % node_name)
851

    
852
    if node_name not in self._config_data.nodes:
853
      raise errors.ConfigurationError("Unknown node '%s'" % node_name)
854

    
855
    del self._config_data.nodes[node_name]
856
    self._config_data.cluster.serial_no += 1
857
    self._WriteConfig()
858

    
859
  @locking.ssynchronized(_config_lock, shared=1)
860
  def ExpandNodeName(self, short_name):
861
    """Attempt to expand an incomplete instance name.
862

863
    """
864
    return utils.MatchNameComponent(short_name,
865
                                    self._config_data.nodes.keys())
866

    
867
  def _UnlockedGetNodeInfo(self, node_name):
868
    """Get the configuration of a node, as stored in the config.
869

870
    This function is for internal use, when the config lock is already
871
    held.
872

873
    @param node_name: the node name, e.g. I{node1.example.com}
874

875
    @rtype: L{objects.Node}
876
    @return: the node object
877

878
    """
879
    if node_name not in self._config_data.nodes:
880
      return None
881

    
882
    return self._config_data.nodes[node_name]
883

    
884

    
885
  @locking.ssynchronized(_config_lock, shared=1)
886
  def GetNodeInfo(self, node_name):
887
    """Get the configuration of a node, as stored in the config.
888

889
    This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
890

891
    @param node_name: the node name, e.g. I{node1.example.com}
892

893
    @rtype: L{objects.Node}
894
    @return: the node object
895

896
    """
897
    return self._UnlockedGetNodeInfo(node_name)
898

    
899
  def _UnlockedGetNodeList(self):
900
    """Return the list of nodes which are in the configuration.
901

902
    This function is for internal use, when the config lock is already
903
    held.
904

905
    @rtype: list
906

907
    """
908
    return self._config_data.nodes.keys()
909

    
910

    
911
  @locking.ssynchronized(_config_lock, shared=1)
912
  def GetNodeList(self):
913
    """Return the list of nodes which are in the configuration.
914

915
    """
916
    return self._UnlockedGetNodeList()
917

    
918
  @locking.ssynchronized(_config_lock, shared=1)
919
  def GetOnlineNodeList(self):
920
    """Return the list of nodes which are online.
921

922
    """
923
    all_nodes = [self._UnlockedGetNodeInfo(node)
924
                 for node in self._UnlockedGetNodeList()]
925
    return [node.name for node in all_nodes if not node.offline]
926

    
927
  @locking.ssynchronized(_config_lock, shared=1)
928
  def GetAllNodesInfo(self):
929
    """Get the configuration of all nodes.
930

931
    @rtype: dict
932
    @return: dict of (node, node_info), where node_info is what
933
              would GetNodeInfo return for the node
934

935
    """
936
    my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
937
                    for node in self._UnlockedGetNodeList()])
938
    return my_dict
939

    
940
  def _UnlockedGetMasterCandidateStats(self):
941
    """Get the number of current and maximum desired and possible candidates.
942

943
    @rtype: tuple
944
    @return: tuple of (current, desired and possible)
945

946
    """
947
    mc_now = mc_max = 0
948
    for node in self._config_data.nodes.itervalues():
949
      if not (node.offline or node.drained):
950
        mc_max += 1
951
      if node.master_candidate:
952
        mc_now += 1
953
    mc_max = min(mc_max, self._config_data.cluster.candidate_pool_size)
954
    return (mc_now, mc_max)
955

    
956
  @locking.ssynchronized(_config_lock, shared=1)
957
  def GetMasterCandidateStats(self):
958
    """Get the number of current and maximum possible candidates.
959

960
    This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
961

962
    @rtype: tuple
963
    @return: tuple of (current, max)
964

965
    """
966
    return self._UnlockedGetMasterCandidateStats()
967

    
968
  @locking.ssynchronized(_config_lock)
969
  def MaintainCandidatePool(self):
970
    """Try to grow the candidate pool to the desired size.
971

972
    @rtype: list
973
    @return: list with the adjusted nodes (L{objects.Node} instances)
974

975
    """
976
    mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
977
    mod_list = []
978
    if mc_now < mc_max:
979
      node_list = self._config_data.nodes.keys()
980
      random.shuffle(node_list)
981
      for name in node_list:
982
        if mc_now >= mc_max:
983
          break
984
        node = self._config_data.nodes[name]
985
        if node.master_candidate or node.offline or node.drained:
986
          continue
987
        mod_list.append(node)
988
        node.master_candidate = True
989
        node.serial_no += 1
990
        mc_now += 1
991
      if mc_now != mc_max:
992
        # this should not happen
993
        logging.warning("Warning: MaintainCandidatePool didn't manage to"
994
                        " fill the candidate pool (%d/%d)", mc_now, mc_max)
995
      if mod_list:
996
        self._config_data.cluster.serial_no += 1
997
        self._WriteConfig()
998

    
999
    return mod_list
1000

    
1001
  def _BumpSerialNo(self):
1002
    """Bump up the serial number of the config.
1003

1004
    """
1005
    self._config_data.serial_no += 1
1006

    
1007
  def _OpenConfig(self):
1008
    """Read the config data from disk.
1009

1010
    """
1011
    f = open(self._cfg_file, 'r')
1012
    try:
1013
      try:
1014
        data = objects.ConfigData.FromDict(serializer.Load(f.read()))
1015
      except Exception, err:
1016
        raise errors.ConfigurationError(err)
1017
    finally:
1018
      f.close()
1019

    
1020
    # Make sure the configuration has the right version
1021
    _ValidateConfig(data)
1022

    
1023
    if (not hasattr(data, 'cluster') or
1024
        not hasattr(data.cluster, 'rsahostkeypub')):
1025
      raise errors.ConfigurationError("Incomplete configuration"
1026
                                      " (missing cluster.rsahostkeypub)")
1027
    self._config_data = data
1028
    # reset the last serial as -1 so that the next write will cause
1029
    # ssconf update
1030
    self._last_cluster_serial = -1
1031

    
1032
  def _DistributeConfig(self):
1033
    """Distribute the configuration to the other nodes.
1034

1035
    Currently, this only copies the configuration file. In the future,
1036
    it could be used to encapsulate the 2/3-phase update mechanism.
1037

1038
    """
1039
    if self._offline:
1040
      return True
1041
    bad = False
1042

    
1043
    node_list = []
1044
    addr_list = []
1045
    myhostname = self._my_hostname
1046
    # we can skip checking whether _UnlockedGetNodeInfo returns None
1047
    # since the node list comes from _UnlocketGetNodeList, and we are
1048
    # called with the lock held, so no modifications should take place
1049
    # in between
1050
    for node_name in self._UnlockedGetNodeList():
1051
      if node_name == myhostname:
1052
        continue
1053
      node_info = self._UnlockedGetNodeInfo(node_name)
1054
      if not node_info.master_candidate:
1055
        continue
1056
      node_list.append(node_info.name)
1057
      addr_list.append(node_info.primary_ip)
1058

    
1059
    result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1060
                                            address_list=addr_list)
1061
    for node in node_list:
1062
      if not result[node]:
1063
        logging.error("copy of file %s to node %s failed",
1064
                      self._cfg_file, node)
1065
        bad = True
1066
    return not bad
1067

    
1068
  def _WriteConfig(self, destination=None):
1069
    """Write the configuration data to persistent storage.
1070

1071
    """
1072
    config_errors = self._UnlockedVerifyConfig()
1073
    if config_errors:
1074
      raise errors.ConfigurationError("Configuration data is not"
1075
                                      " consistent: %s" %
1076
                                      (", ".join(config_errors)))
1077
    if destination is None:
1078
      destination = self._cfg_file
1079
    self._BumpSerialNo()
1080
    txt = serializer.Dump(self._config_data.ToDict())
1081
    dir_name, file_name = os.path.split(destination)
1082
    fd, name = tempfile.mkstemp('.newconfig', file_name, dir_name)
1083
    f = os.fdopen(fd, 'w')
1084
    try:
1085
      f.write(txt)
1086
      os.fsync(f.fileno())
1087
    finally:
1088
      f.close()
1089
    # we don't need to do os.close(fd) as f.close() did it
1090
    os.rename(name, destination)
1091
    self.write_count += 1
1092

    
1093
    # and redistribute the config file to master candidates
1094
    self._DistributeConfig()
1095

    
1096
    # Write ssconf files on all nodes (including locally)
1097
    if self._last_cluster_serial < self._config_data.cluster.serial_no:
1098
      if not self._offline:
1099
        rpc.RpcRunner.call_write_ssconf_files(self._UnlockedGetNodeList(),
1100
                                              self._UnlockedGetSsconfValues())
1101
      self._last_cluster_serial = self._config_data.cluster.serial_no
1102

    
1103
  def _UnlockedGetSsconfValues(self):
1104
    """Return the values needed by ssconf.
1105

1106
    @rtype: dict
1107
    @return: a dictionary with keys the ssconf names and values their
1108
        associated value
1109

1110
    """
1111
    fn = "\n".join
1112
    instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1113
    node_names = utils.NiceSort(self._UnlockedGetNodeList())
1114
    node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1115

    
1116
    instance_data = fn(instance_names)
1117
    off_data = fn(node.name for node in node_info if node.offline)
1118
    on_data = fn(node.name for node in node_info if not node.offline)
1119
    mc_data = fn(node.name for node in node_info if node.master_candidate)
1120
    node_data = fn(node_names)
1121

    
1122
    cluster = self._config_data.cluster
1123
    return {
1124
      constants.SS_CLUSTER_NAME: cluster.cluster_name,
1125
      constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1126
      constants.SS_MASTER_CANDIDATES: mc_data,
1127
      constants.SS_MASTER_IP: cluster.master_ip,
1128
      constants.SS_MASTER_NETDEV: cluster.master_netdev,
1129
      constants.SS_MASTER_NODE: cluster.master_node,
1130
      constants.SS_NODE_LIST: node_data,
1131
      constants.SS_OFFLINE_NODES: off_data,
1132
      constants.SS_ONLINE_NODES: on_data,
1133
      constants.SS_INSTANCE_LIST: instance_data,
1134
      constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1135
      }
1136

    
1137
  @locking.ssynchronized(_config_lock)
1138
  def InitConfig(self, version, cluster_config, master_node_config):
1139
    """Create the initial cluster configuration.
1140

1141
    It will contain the current node, which will also be the master
1142
    node, and no instances.
1143

1144
    @type version: int
1145
    @param version: Configuration version
1146
    @type cluster_config: objects.Cluster
1147
    @param cluster_config: Cluster configuration
1148
    @type master_node_config: objects.Node
1149
    @param master_node_config: Master node configuration
1150

1151
    """
1152
    nodes = {
1153
      master_node_config.name: master_node_config,
1154
      }
1155

    
1156
    self._config_data = objects.ConfigData(version=version,
1157
                                           cluster=cluster_config,
1158
                                           nodes=nodes,
1159
                                           instances={},
1160
                                           serial_no=1)
1161
    self._WriteConfig()
1162

    
1163
  @locking.ssynchronized(_config_lock, shared=1)
1164
  def GetVGName(self):
1165
    """Return the volume group name.
1166

1167
    """
1168
    return self._config_data.cluster.volume_group_name
1169

    
1170
  @locking.ssynchronized(_config_lock)
1171
  def SetVGName(self, vg_name):
1172
    """Set the volume group name.
1173

1174
    """
1175
    self._config_data.cluster.volume_group_name = vg_name
1176
    self._config_data.cluster.serial_no += 1
1177
    self._WriteConfig()
1178

    
1179
  @locking.ssynchronized(_config_lock, shared=1)
1180
  def GetDefBridge(self):
1181
    """Return the default bridge.
1182

1183
    """
1184
    return self._config_data.cluster.default_bridge
1185

    
1186
  @locking.ssynchronized(_config_lock, shared=1)
1187
  def GetMACPrefix(self):
1188
    """Return the mac prefix.
1189

1190
    """
1191
    return self._config_data.cluster.mac_prefix
1192

    
1193
  @locking.ssynchronized(_config_lock, shared=1)
1194
  def GetClusterInfo(self):
1195
    """Returns informations about the cluster
1196

1197
    @rtype: L{objects.Cluster}
1198
    @return: the cluster object
1199

1200
    """
1201
    return self._config_data.cluster
1202

    
1203
  @locking.ssynchronized(_config_lock)
1204
  def Update(self, target):
1205
    """Notify function to be called after updates.
1206

1207
    This function must be called when an object (as returned by
1208
    GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1209
    caller wants the modifications saved to the backing store. Note
1210
    that all modified objects will be saved, but the target argument
1211
    is the one the caller wants to ensure that it's saved.
1212

1213
    @param target: an instance of either L{objects.Cluster},
1214
        L{objects.Node} or L{objects.Instance} which is existing in
1215
        the cluster
1216

1217
    """
1218
    if self._config_data is None:
1219
      raise errors.ProgrammerError("Configuration file not read,"
1220
                                   " cannot save.")
1221
    update_serial = False
1222
    if isinstance(target, objects.Cluster):
1223
      test = target == self._config_data.cluster
1224
    elif isinstance(target, objects.Node):
1225
      test = target in self._config_data.nodes.values()
1226
      update_serial = True
1227
    elif isinstance(target, objects.Instance):
1228
      test = target in self._config_data.instances.values()
1229
    else:
1230
      raise errors.ProgrammerError("Invalid object type (%s) passed to"
1231
                                   " ConfigWriter.Update" % type(target))
1232
    if not test:
1233
      raise errors.ConfigurationError("Configuration updated since object"
1234
                                      " has been read or unknown object")
1235
    target.serial_no += 1
1236

    
1237
    if update_serial:
1238
      # for node updates, we need to increase the cluster serial too
1239
      self._config_data.cluster.serial_no += 1
1240

    
1241
    if isinstance(target, objects.Instance):
1242
      self._UnlockedReleaseDRBDMinors(target.name)
1243
      for nic in target.nics:
1244
        self._temporary_macs.discard(nic.mac)
1245

    
1246
    self._WriteConfig()