Unify the “--backend-parameters” option
[ganeti-local] / lib / config.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Configuration management for Ganeti
23
24 This module provides the interface to the Ganeti cluster configuration.
25
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
28
29 Currently, the data storage format is JSON. YAML was slow and consuming too
30 much memory.
31
32 """
33
34 import os
35 import tempfile
36 import random
37 import logging
38 import time
39
40 from ganeti import errors
41 from ganeti import locking
42 from ganeti import utils
43 from ganeti import constants
44 from ganeti import rpc
45 from ganeti import objects
46 from ganeti import serializer
47
48
49 _config_lock = locking.SharedLock()
50
51
52 def _ValidateConfig(data):
53   """Verifies that a configuration objects looks valid.
54
55   This only verifies the version of the configuration.
56
57   @raise errors.ConfigurationError: if the version differs from what
58       we expect
59
60   """
61   if data.version != constants.CONFIG_VERSION:
62     raise errors.ConfigurationError("Cluster configuration version"
63                                     " mismatch, got %s instead of %s" %
64                                     (data.version,
65                                      constants.CONFIG_VERSION))
66
67
68 class ConfigWriter:
69   """The interface to the cluster configuration.
70
71   """
72   def __init__(self, cfg_file=None, offline=False):
73     self.write_count = 0
74     self._lock = _config_lock
75     self._config_data = None
76     self._offline = offline
77     if cfg_file is None:
78       self._cfg_file = constants.CLUSTER_CONF_FILE
79     else:
80       self._cfg_file = cfg_file
81     self._temporary_ids = set()
82     self._temporary_drbds = {}
83     self._temporary_macs = set()
84     # Note: in order to prevent errors when resolving our name in
85     # _DistributeConfig, we compute it here once and reuse it; it's
86     # better to raise an error before starting to modify the config
87     # file than after it was modified
88     self._my_hostname = utils.HostInfo().name
89     self._last_cluster_serial = -1
90     self._OpenConfig()
91
92   # this method needs to be static, so that we can call it on the class
93   @staticmethod
94   def IsCluster():
95     """Check if the cluster is configured.
96
97     """
98     return os.path.exists(constants.CLUSTER_CONF_FILE)
99
100   @locking.ssynchronized(_config_lock, shared=1)
101   def GenerateMAC(self):
102     """Generate a MAC for an instance.
103
104     This should check the current instances for duplicates.
105
106     """
107     prefix = self._config_data.cluster.mac_prefix
108     all_macs = self._AllMACs()
109     retries = 64
110     while retries > 0:
111       byte1 = random.randrange(0, 256)
112       byte2 = random.randrange(0, 256)
113       byte3 = random.randrange(0, 256)
114       mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
115       if mac not in all_macs and mac not in self._temporary_macs:
116         break
117       retries -= 1
118     else:
119       raise errors.ConfigurationError("Can't generate unique MAC")
120     self._temporary_macs.add(mac)
121     return mac
122
123   @locking.ssynchronized(_config_lock, shared=1)
124   def IsMacInUse(self, mac):
125     """Predicate: check if the specified MAC is in use in the Ganeti cluster.
126
127     This only checks instances managed by this cluster, it does not
128     check for potential collisions elsewhere.
129
130     """
131     all_macs = self._AllMACs()
132     return mac in all_macs or mac in self._temporary_macs
133
134   @locking.ssynchronized(_config_lock, shared=1)
135   def GenerateDRBDSecret(self):
136     """Generate a DRBD secret.
137
138     This checks the current disks for duplicates.
139
140     """
141     all_secrets = self._AllDRBDSecrets()
142     retries = 64
143     while retries > 0:
144       secret = utils.GenerateSecret()
145       if secret not in all_secrets:
146         break
147       retries -= 1
148     else:
149       raise errors.ConfigurationError("Can't generate unique DRBD secret")
150     return secret
151
152   def _ComputeAllLVs(self):
153     """Compute the list of all LVs.
154
155     """
156     lvnames = set()
157     for instance in self._config_data.instances.values():
158       node_data = instance.MapLVsByNode()
159       for lv_list in node_data.values():
160         lvnames.update(lv_list)
161     return lvnames
162
163   @locking.ssynchronized(_config_lock, shared=1)
164   def GenerateUniqueID(self, exceptions=None):
165     """Generate an unique disk name.
166
167     This checks the current node, instances and disk names for
168     duplicates.
169
170     @param exceptions: a list with some other names which should be checked
171         for uniqueness (used for example when you want to get
172         more than one id at one time without adding each one in
173         turn to the config file)
174
175     @rtype: string
176     @return: the unique id
177
178     """
179     existing = set()
180     existing.update(self._temporary_ids)
181     existing.update(self._ComputeAllLVs())
182     existing.update(self._config_data.instances.keys())
183     existing.update(self._config_data.nodes.keys())
184     if exceptions is not None:
185       existing.update(exceptions)
186     retries = 64
187     while retries > 0:
188       unique_id = utils.NewUUID()
189       if unique_id not in existing and unique_id is not None:
190         break
191     else:
192       raise errors.ConfigurationError("Not able generate an unique ID"
193                                       " (last tried ID: %s" % unique_id)
194     self._temporary_ids.add(unique_id)
195     return unique_id
196
197   def _AllMACs(self):
198     """Return all MACs present in the config.
199
200     @rtype: list
201     @return: the list of all MACs
202
203     """
204     result = []
205     for instance in self._config_data.instances.values():
206       for nic in instance.nics:
207         result.append(nic.mac)
208
209     return result
210
211   def _AllDRBDSecrets(self):
212     """Return all DRBD secrets present in the config.
213
214     @rtype: list
215     @return: the list of all DRBD secrets
216
217     """
218     def helper(disk, result):
219       """Recursively gather secrets from this disk."""
220       if disk.dev_type == constants.DT_DRBD8:
221         result.append(disk.logical_id[5])
222       if disk.children:
223         for child in disk.children:
224           helper(child, result)
225
226     result = []
227     for instance in self._config_data.instances.values():
228       for disk in instance.disks:
229         helper(disk, result)
230
231     return result
232
233   def _CheckDiskIDs(self, disk, l_ids, p_ids):
234     """Compute duplicate disk IDs
235
236     @type disk: L{objects.Disk}
237     @param disk: the disk at which to start searching
238     @type l_ids: list
239     @param l_ids: list of current logical ids
240     @type p_ids: list
241     @param p_ids: list of current physical ids
242     @rtype: list
243     @return: a list of error messages
244
245     """
246     result = []
247     if disk.logical_id is not None:
248       if disk.logical_id in l_ids:
249         result.append("duplicate logical id %s" % str(disk.logical_id))
250       else:
251         l_ids.append(disk.logical_id)
252     if disk.physical_id is not None:
253       if disk.physical_id in p_ids:
254         result.append("duplicate physical id %s" % str(disk.physical_id))
255       else:
256         p_ids.append(disk.physical_id)
257
258     if disk.children:
259       for child in disk.children:
260         result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
261     return result
262
263   def _UnlockedVerifyConfig(self):
264     """Verify function.
265
266     @rtype: list
267     @return: a list of error messages; a non-empty list signifies
268         configuration errors
269
270     """
271     result = []
272     seen_macs = []
273     ports = {}
274     data = self._config_data
275     seen_lids = []
276     seen_pids = []
277
278     # global cluster checks
279     if not data.cluster.enabled_hypervisors:
280       result.append("enabled hypervisors list doesn't have any entries")
281     invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
282     if invalid_hvs:
283       result.append("enabled hypervisors contains invalid entries: %s" %
284                     invalid_hvs)
285
286     if data.cluster.master_node not in data.nodes:
287       result.append("cluster has invalid primary node '%s'" %
288                     data.cluster.master_node)
289
290     # per-instance checks
291     for instance_name in data.instances:
292       instance = data.instances[instance_name]
293       if instance.primary_node not in data.nodes:
294         result.append("instance '%s' has invalid primary node '%s'" %
295                       (instance_name, instance.primary_node))
296       for snode in instance.secondary_nodes:
297         if snode not in data.nodes:
298           result.append("instance '%s' has invalid secondary node '%s'" %
299                         (instance_name, snode))
300       for idx, nic in enumerate(instance.nics):
301         if nic.mac in seen_macs:
302           result.append("instance '%s' has NIC %d mac %s duplicate" %
303                         (instance_name, idx, nic.mac))
304         else:
305           seen_macs.append(nic.mac)
306
307       # gather the drbd ports for duplicate checks
308       for dsk in instance.disks:
309         if dsk.dev_type in constants.LDS_DRBD:
310           tcp_port = dsk.logical_id[2]
311           if tcp_port not in ports:
312             ports[tcp_port] = []
313           ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
314       # gather network port reservation
315       net_port = getattr(instance, "network_port", None)
316       if net_port is not None:
317         if net_port not in ports:
318           ports[net_port] = []
319         ports[net_port].append((instance.name, "network port"))
320
321       # instance disk verify
322       for idx, disk in enumerate(instance.disks):
323         result.extend(["instance '%s' disk %d error: %s" %
324                        (instance.name, idx, msg) for msg in disk.Verify()])
325         result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
326
327     # cluster-wide pool of free ports
328     for free_port in data.cluster.tcpudp_port_pool:
329       if free_port not in ports:
330         ports[free_port] = []
331       ports[free_port].append(("cluster", "port marked as free"))
332
333     # compute tcp/udp duplicate ports
334     keys = ports.keys()
335     keys.sort()
336     for pnum in keys:
337       pdata = ports[pnum]
338       if len(pdata) > 1:
339         txt = ", ".join(["%s/%s" % val for val in pdata])
340         result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
341
342     # highest used tcp port check
343     if keys:
344       if keys[-1] > data.cluster.highest_used_port:
345         result.append("Highest used port mismatch, saved %s, computed %s" %
346                       (data.cluster.highest_used_port, keys[-1]))
347
348     if not data.nodes[data.cluster.master_node].master_candidate:
349       result.append("Master node is not a master candidate")
350
351     # master candidate checks
352     mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
353     if mc_now < mc_max:
354       result.append("Not enough master candidates: actual %d, target %d" %
355                     (mc_now, mc_max))
356
357     # node checks
358     for node in data.nodes.values():
359       if [node.master_candidate, node.drained, node.offline].count(True) > 1:
360         result.append("Node %s state is invalid: master_candidate=%s,"
361                       " drain=%s, offline=%s" %
362                       (node.name, node.master_candidate, node.drain,
363                        node.offline))
364
365     # drbd minors check
366     d_map, duplicates = self._UnlockedComputeDRBDMap()
367     for node, minor, instance_a, instance_b in duplicates:
368       result.append("DRBD minor %d on node %s is assigned twice to instances"
369                     " %s and %s" % (minor, node, instance_a, instance_b))
370
371     return result
372
373   @locking.ssynchronized(_config_lock, shared=1)
374   def VerifyConfig(self):
375     """Verify function.
376
377     This is just a wrapper over L{_UnlockedVerifyConfig}.
378
379     @rtype: list
380     @return: a list of error messages; a non-empty list signifies
381         configuration errors
382
383     """
384     return self._UnlockedVerifyConfig()
385
386   def _UnlockedSetDiskID(self, disk, node_name):
387     """Convert the unique ID to the ID needed on the target nodes.
388
389     This is used only for drbd, which needs ip/port configuration.
390
391     The routine descends down and updates its children also, because
392     this helps when the only the top device is passed to the remote
393     node.
394
395     This function is for internal use, when the config lock is already held.
396
397     """
398     if disk.children:
399       for child in disk.children:
400         self._UnlockedSetDiskID(child, node_name)
401
402     if disk.logical_id is None and disk.physical_id is not None:
403       return
404     if disk.dev_type == constants.LD_DRBD8:
405       pnode, snode, port, pminor, sminor, secret = disk.logical_id
406       if node_name not in (pnode, snode):
407         raise errors.ConfigurationError("DRBD device not knowing node %s" %
408                                         node_name)
409       pnode_info = self._UnlockedGetNodeInfo(pnode)
410       snode_info = self._UnlockedGetNodeInfo(snode)
411       if pnode_info is None or snode_info is None:
412         raise errors.ConfigurationError("Can't find primary or secondary node"
413                                         " for %s" % str(disk))
414       p_data = (pnode_info.secondary_ip, port)
415       s_data = (snode_info.secondary_ip, port)
416       if pnode == node_name:
417         disk.physical_id = p_data + s_data + (pminor, secret)
418       else: # it must be secondary, we tested above
419         disk.physical_id = s_data + p_data + (sminor, secret)
420     else:
421       disk.physical_id = disk.logical_id
422     return
423
424   @locking.ssynchronized(_config_lock)
425   def SetDiskID(self, disk, node_name):
426     """Convert the unique ID to the ID needed on the target nodes.
427
428     This is used only for drbd, which needs ip/port configuration.
429
430     The routine descends down and updates its children also, because
431     this helps when the only the top device is passed to the remote
432     node.
433
434     """
435     return self._UnlockedSetDiskID(disk, node_name)
436
437   @locking.ssynchronized(_config_lock)
438   def AddTcpUdpPort(self, port):
439     """Adds a new port to the available port pool.
440
441     """
442     if not isinstance(port, int):
443       raise errors.ProgrammerError("Invalid type passed for port")
444
445     self._config_data.cluster.tcpudp_port_pool.add(port)
446     self._WriteConfig()
447
448   @locking.ssynchronized(_config_lock, shared=1)
449   def GetPortList(self):
450     """Returns a copy of the current port list.
451
452     """
453     return self._config_data.cluster.tcpudp_port_pool.copy()
454
455   @locking.ssynchronized(_config_lock)
456   def AllocatePort(self):
457     """Allocate a port.
458
459     The port will be taken from the available port pool or from the
460     default port range (and in this case we increase
461     highest_used_port).
462
463     """
464     # If there are TCP/IP ports configured, we use them first.
465     if self._config_data.cluster.tcpudp_port_pool:
466       port = self._config_data.cluster.tcpudp_port_pool.pop()
467     else:
468       port = self._config_data.cluster.highest_used_port + 1
469       if port >= constants.LAST_DRBD_PORT:
470         raise errors.ConfigurationError("The highest used port is greater"
471                                         " than %s. Aborting." %
472                                         constants.LAST_DRBD_PORT)
473       self._config_data.cluster.highest_used_port = port
474
475     self._WriteConfig()
476     return port
477
478   def _UnlockedComputeDRBDMap(self):
479     """Compute the used DRBD minor/nodes.
480
481     @rtype: (dict, list)
482     @return: dictionary of node_name: dict of minor: instance_name;
483         the returned dict will have all the nodes in it (even if with
484         an empty list), and a list of duplicates; if the duplicates
485         list is not empty, the configuration is corrupted and its caller
486         should raise an exception
487
488     """
489     def _AppendUsedPorts(instance_name, disk, used):
490       duplicates = []
491       if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
492         node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
493         for node, port in ((node_a, minor_a), (node_b, minor_b)):
494           assert node in used, ("Node '%s' of instance '%s' not found"
495                                 " in node list" % (node, instance_name))
496           if port in used[node]:
497             duplicates.append((node, port, instance_name, used[node][port]))
498           else:
499             used[node][port] = instance_name
500       if disk.children:
501         for child in disk.children:
502           duplicates.extend(_AppendUsedPorts(instance_name, child, used))
503       return duplicates
504
505     duplicates = []
506     my_dict = dict((node, {}) for node in self._config_data.nodes)
507     for instance in self._config_data.instances.itervalues():
508       for disk in instance.disks:
509         duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
510     for (node, minor), instance in self._temporary_drbds.iteritems():
511       if minor in my_dict[node] and my_dict[node][minor] != instance:
512         duplicates.append((node, minor, instance, my_dict[node][minor]))
513       else:
514         my_dict[node][minor] = instance
515     return my_dict, duplicates
516
517   @locking.ssynchronized(_config_lock)
518   def ComputeDRBDMap(self):
519     """Compute the used DRBD minor/nodes.
520
521     This is just a wrapper over L{_UnlockedComputeDRBDMap}.
522
523     @return: dictionary of node_name: dict of minor: instance_name;
524         the returned dict will have all the nodes in it (even if with
525         an empty list).
526
527     """
528     d_map, duplicates = self._UnlockedComputeDRBDMap()
529     if duplicates:
530       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
531                                       str(duplicates))
532     return d_map
533
534   @locking.ssynchronized(_config_lock)
535   def AllocateDRBDMinor(self, nodes, instance):
536     """Allocate a drbd minor.
537
538     The free minor will be automatically computed from the existing
539     devices. A node can be given multiple times in order to allocate
540     multiple minors. The result is the list of minors, in the same
541     order as the passed nodes.
542
543     @type instance: string
544     @param instance: the instance for which we allocate minors
545
546     """
547     assert isinstance(instance, basestring), \
548            "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
549
550     d_map, duplicates = self._UnlockedComputeDRBDMap()
551     if duplicates:
552       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
553                                       str(duplicates))
554     result = []
555     for nname in nodes:
556       ndata = d_map[nname]
557       if not ndata:
558         # no minors used, we can start at 0
559         result.append(0)
560         ndata[0] = instance
561         self._temporary_drbds[(nname, 0)] = instance
562         continue
563       keys = ndata.keys()
564       keys.sort()
565       ffree = utils.FirstFree(keys)
566       if ffree is None:
567         # return the next minor
568         # TODO: implement high-limit check
569         minor = keys[-1] + 1
570       else:
571         minor = ffree
572       # double-check minor against current instances
573       assert minor not in d_map[nname], \
574              ("Attempt to reuse allocated DRBD minor %d on node %s,"
575               " already allocated to instance %s" %
576               (minor, nname, d_map[nname][minor]))
577       ndata[minor] = instance
578       # double-check minor against reservation
579       r_key = (nname, minor)
580       assert r_key not in self._temporary_drbds, \
581              ("Attempt to reuse reserved DRBD minor %d on node %s,"
582               " reserved for instance %s" %
583               (minor, nname, self._temporary_drbds[r_key]))
584       self._temporary_drbds[r_key] = instance
585       result.append(minor)
586     logging.debug("Request to allocate drbd minors, input: %s, returning %s",
587                   nodes, result)
588     return result
589
590   def _UnlockedReleaseDRBDMinors(self, instance):
591     """Release temporary drbd minors allocated for a given instance.
592
593     @type instance: string
594     @param instance: the instance for which temporary minors should be
595                      released
596
597     """
598     assert isinstance(instance, basestring), \
599            "Invalid argument passed to ReleaseDRBDMinors"
600     for key, name in self._temporary_drbds.items():
601       if name == instance:
602         del self._temporary_drbds[key]
603
604   @locking.ssynchronized(_config_lock)
605   def ReleaseDRBDMinors(self, instance):
606     """Release temporary drbd minors allocated for a given instance.
607
608     This should be called on the error paths, on the success paths
609     it's automatically called by the ConfigWriter add and update
610     functions.
611
612     This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
613
614     @type instance: string
615     @param instance: the instance for which temporary minors should be
616                      released
617
618     """
619     self._UnlockedReleaseDRBDMinors(instance)
620
621   @locking.ssynchronized(_config_lock, shared=1)
622   def GetConfigVersion(self):
623     """Get the configuration version.
624
625     @return: Config version
626
627     """
628     return self._config_data.version
629
630   @locking.ssynchronized(_config_lock, shared=1)
631   def GetClusterName(self):
632     """Get cluster name.
633
634     @return: Cluster name
635
636     """
637     return self._config_data.cluster.cluster_name
638
639   @locking.ssynchronized(_config_lock, shared=1)
640   def GetMasterNode(self):
641     """Get the hostname of the master node for this cluster.
642
643     @return: Master hostname
644
645     """
646     return self._config_data.cluster.master_node
647
648   @locking.ssynchronized(_config_lock, shared=1)
649   def GetMasterIP(self):
650     """Get the IP of the master node for this cluster.
651
652     @return: Master IP
653
654     """
655     return self._config_data.cluster.master_ip
656
657   @locking.ssynchronized(_config_lock, shared=1)
658   def GetMasterNetdev(self):
659     """Get the master network device for this cluster.
660
661     """
662     return self._config_data.cluster.master_netdev
663
664   @locking.ssynchronized(_config_lock, shared=1)
665   def GetFileStorageDir(self):
666     """Get the file storage dir for this cluster.
667
668     """
669     return self._config_data.cluster.file_storage_dir
670
671   @locking.ssynchronized(_config_lock, shared=1)
672   def GetHypervisorType(self):
673     """Get the hypervisor type for this cluster.
674
675     """
676     return self._config_data.cluster.enabled_hypervisors[0]
677
678   @locking.ssynchronized(_config_lock, shared=1)
679   def GetHostKey(self):
680     """Return the rsa hostkey from the config.
681
682     @rtype: string
683     @return: the rsa hostkey
684
685     """
686     return self._config_data.cluster.rsahostkeypub
687
688   @locking.ssynchronized(_config_lock)
689   def AddInstance(self, instance):
690     """Add an instance to the config.
691
692     This should be used after creating a new instance.
693
694     @type instance: L{objects.Instance}
695     @param instance: the instance object
696
697     """
698     if not isinstance(instance, objects.Instance):
699       raise errors.ProgrammerError("Invalid type passed to AddInstance")
700
701     if instance.disk_template != constants.DT_DISKLESS:
702       all_lvs = instance.MapLVsByNode()
703       logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
704
705     all_macs = self._AllMACs()
706     for nic in instance.nics:
707       if nic.mac in all_macs:
708         raise errors.ConfigurationError("Cannot add instance %s:"
709           " MAC address '%s' already in use." % (instance.name, nic.mac))
710
711     instance.serial_no = 1
712     instance.ctime = instance.mtime = time.time()
713     self._config_data.instances[instance.name] = instance
714     self._config_data.cluster.serial_no += 1
715     self._UnlockedReleaseDRBDMinors(instance.name)
716     for nic in instance.nics:
717       self._temporary_macs.discard(nic.mac)
718     self._WriteConfig()
719
720   def _SetInstanceStatus(self, instance_name, status):
721     """Set the instance's status to a given value.
722
723     """
724     assert isinstance(status, bool), \
725            "Invalid status '%s' passed to SetInstanceStatus" % (status,)
726
727     if instance_name not in self._config_data.instances:
728       raise errors.ConfigurationError("Unknown instance '%s'" %
729                                       instance_name)
730     instance = self._config_data.instances[instance_name]
731     if instance.admin_up != status:
732       instance.admin_up = status
733       instance.serial_no += 1
734       instance.mtime = time.time()
735       self._WriteConfig()
736
737   @locking.ssynchronized(_config_lock)
738   def MarkInstanceUp(self, instance_name):
739     """Mark the instance status to up in the config.
740
741     """
742     self._SetInstanceStatus(instance_name, True)
743
744   @locking.ssynchronized(_config_lock)
745   def RemoveInstance(self, instance_name):
746     """Remove the instance from the configuration.
747
748     """
749     if instance_name not in self._config_data.instances:
750       raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
751     del self._config_data.instances[instance_name]
752     self._config_data.cluster.serial_no += 1
753     self._WriteConfig()
754
755   @locking.ssynchronized(_config_lock)
756   def RenameInstance(self, old_name, new_name):
757     """Rename an instance.
758
759     This needs to be done in ConfigWriter and not by RemoveInstance
760     combined with AddInstance as only we can guarantee an atomic
761     rename.
762
763     """
764     if old_name not in self._config_data.instances:
765       raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
766     inst = self._config_data.instances[old_name]
767     del self._config_data.instances[old_name]
768     inst.name = new_name
769
770     for disk in inst.disks:
771       if disk.dev_type == constants.LD_FILE:
772         # rename the file paths in logical and physical id
773         file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
774         disk.physical_id = disk.logical_id = (disk.logical_id[0],
775                                               os.path.join(file_storage_dir,
776                                                            inst.name,
777                                                            disk.iv_name))
778
779     self._config_data.instances[inst.name] = inst
780     self._WriteConfig()
781
782   @locking.ssynchronized(_config_lock)
783   def MarkInstanceDown(self, instance_name):
784     """Mark the status of an instance to down in the configuration.
785
786     """
787     self._SetInstanceStatus(instance_name, False)
788
789   def _UnlockedGetInstanceList(self):
790     """Get the list of instances.
791
792     This function is for internal use, when the config lock is already held.
793
794     """
795     return self._config_data.instances.keys()
796
797   @locking.ssynchronized(_config_lock, shared=1)
798   def GetInstanceList(self):
799     """Get the list of instances.
800
801     @return: array of instances, ex. ['instance2.example.com',
802         'instance1.example.com']
803
804     """
805     return self._UnlockedGetInstanceList()
806
807   @locking.ssynchronized(_config_lock, shared=1)
808   def ExpandInstanceName(self, short_name):
809     """Attempt to expand an incomplete instance name.
810
811     """
812     return utils.MatchNameComponent(short_name,
813                                     self._config_data.instances.keys())
814
815   def _UnlockedGetInstanceInfo(self, instance_name):
816     """Returns information about an instance.
817
818     This function is for internal use, when the config lock is already held.
819
820     """
821     if instance_name not in self._config_data.instances:
822       return None
823
824     return self._config_data.instances[instance_name]
825
826   @locking.ssynchronized(_config_lock, shared=1)
827   def GetInstanceInfo(self, instance_name):
828     """Returns information about an instance.
829
830     It takes the information from the configuration file. Other information of
831     an instance are taken from the live systems.
832
833     @param instance_name: name of the instance, e.g.
834         I{instance1.example.com}
835
836     @rtype: L{objects.Instance}
837     @return: the instance object
838
839     """
840     return self._UnlockedGetInstanceInfo(instance_name)
841
842   @locking.ssynchronized(_config_lock, shared=1)
843   def GetAllInstancesInfo(self):
844     """Get the configuration of all instances.
845
846     @rtype: dict
847     @return: dict of (instance, instance_info), where instance_info is what
848               would GetInstanceInfo return for the node
849
850     """
851     my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
852                     for instance in self._UnlockedGetInstanceList()])
853     return my_dict
854
855   @locking.ssynchronized(_config_lock)
856   def AddNode(self, node):
857     """Add a node to the configuration.
858
859     @type node: L{objects.Node}
860     @param node: a Node instance
861
862     """
863     logging.info("Adding node %s to configuration" % node.name)
864
865     node.serial_no = 1
866     node.ctime = node.mtime = time.time()
867     self._config_data.nodes[node.name] = node
868     self._config_data.cluster.serial_no += 1
869     self._WriteConfig()
870
871   @locking.ssynchronized(_config_lock)
872   def RemoveNode(self, node_name):
873     """Remove a node from the configuration.
874
875     """
876     logging.info("Removing node %s from configuration" % node_name)
877
878     if node_name not in self._config_data.nodes:
879       raise errors.ConfigurationError("Unknown node '%s'" % node_name)
880
881     del self._config_data.nodes[node_name]
882     self._config_data.cluster.serial_no += 1
883     self._WriteConfig()
884
885   @locking.ssynchronized(_config_lock, shared=1)
886   def ExpandNodeName(self, short_name):
887     """Attempt to expand an incomplete instance name.
888
889     """
890     return utils.MatchNameComponent(short_name,
891                                     self._config_data.nodes.keys())
892
893   def _UnlockedGetNodeInfo(self, node_name):
894     """Get the configuration of a node, as stored in the config.
895
896     This function is for internal use, when the config lock is already
897     held.
898
899     @param node_name: the node name, e.g. I{node1.example.com}
900
901     @rtype: L{objects.Node}
902     @return: the node object
903
904     """
905     if node_name not in self._config_data.nodes:
906       return None
907
908     return self._config_data.nodes[node_name]
909
910
911   @locking.ssynchronized(_config_lock, shared=1)
912   def GetNodeInfo(self, node_name):
913     """Get the configuration of a node, as stored in the config.
914
915     This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
916
917     @param node_name: the node name, e.g. I{node1.example.com}
918
919     @rtype: L{objects.Node}
920     @return: the node object
921
922     """
923     return self._UnlockedGetNodeInfo(node_name)
924
925   def _UnlockedGetNodeList(self):
926     """Return the list of nodes which are in the configuration.
927
928     This function is for internal use, when the config lock is already
929     held.
930
931     @rtype: list
932
933     """
934     return self._config_data.nodes.keys()
935
936
937   @locking.ssynchronized(_config_lock, shared=1)
938   def GetNodeList(self):
939     """Return the list of nodes which are in the configuration.
940
941     """
942     return self._UnlockedGetNodeList()
943
944   @locking.ssynchronized(_config_lock, shared=1)
945   def GetOnlineNodeList(self):
946     """Return the list of nodes which are online.
947
948     """
949     all_nodes = [self._UnlockedGetNodeInfo(node)
950                  for node in self._UnlockedGetNodeList()]
951     return [node.name for node in all_nodes if not node.offline]
952
953   @locking.ssynchronized(_config_lock, shared=1)
954   def GetAllNodesInfo(self):
955     """Get the configuration of all nodes.
956
957     @rtype: dict
958     @return: dict of (node, node_info), where node_info is what
959               would GetNodeInfo return for the node
960
961     """
962     my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
963                     for node in self._UnlockedGetNodeList()])
964     return my_dict
965
966   def _UnlockedGetMasterCandidateStats(self, exceptions=None):
967     """Get the number of current and maximum desired and possible candidates.
968
969     @type exceptions: list
970     @param exceptions: if passed, list of nodes that should be ignored
971     @rtype: tuple
972     @return: tuple of (current, desired and possible)
973
974     """
975     mc_now = mc_max = 0
976     for node in self._config_data.nodes.values():
977       if exceptions and node.name in exceptions:
978         continue
979       if not (node.offline or node.drained):
980         mc_max += 1
981       if node.master_candidate:
982         mc_now += 1
983     mc_max = min(mc_max, self._config_data.cluster.candidate_pool_size)
984     return (mc_now, mc_max)
985
986   @locking.ssynchronized(_config_lock, shared=1)
987   def GetMasterCandidateStats(self, exceptions=None):
988     """Get the number of current and maximum possible candidates.
989
990     This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
991
992     @type exceptions: list
993     @param exceptions: if passed, list of nodes that should be ignored
994     @rtype: tuple
995     @return: tuple of (current, max)
996
997     """
998     return self._UnlockedGetMasterCandidateStats(exceptions)
999
1000   @locking.ssynchronized(_config_lock)
1001   def MaintainCandidatePool(self):
1002     """Try to grow the candidate pool to the desired size.
1003
1004     @rtype: list
1005     @return: list with the adjusted nodes (L{objects.Node} instances)
1006
1007     """
1008     mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
1009     mod_list = []
1010     if mc_now < mc_max:
1011       node_list = self._config_data.nodes.keys()
1012       random.shuffle(node_list)
1013       for name in node_list:
1014         if mc_now >= mc_max:
1015           break
1016         node = self._config_data.nodes[name]
1017         if node.master_candidate or node.offline or node.drained:
1018           continue
1019         mod_list.append(node)
1020         node.master_candidate = True
1021         node.serial_no += 1
1022         mc_now += 1
1023       if mc_now != mc_max:
1024         # this should not happen
1025         logging.warning("Warning: MaintainCandidatePool didn't manage to"
1026                         " fill the candidate pool (%d/%d)", mc_now, mc_max)
1027       if mod_list:
1028         self._config_data.cluster.serial_no += 1
1029         self._WriteConfig()
1030
1031     return mod_list
1032
1033   def _BumpSerialNo(self):
1034     """Bump up the serial number of the config.
1035
1036     """
1037     self._config_data.serial_no += 1
1038     self._config_data.mtime = time.time()
1039
1040   def _OpenConfig(self):
1041     """Read the config data from disk.
1042
1043     """
1044     raw_data = utils.ReadFile(self._cfg_file)
1045
1046     try:
1047       data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1048     except Exception, err:
1049       raise errors.ConfigurationError(err)
1050
1051     # Make sure the configuration has the right version
1052     _ValidateConfig(data)
1053
1054     if (not hasattr(data, 'cluster') or
1055         not hasattr(data.cluster, 'rsahostkeypub')):
1056       raise errors.ConfigurationError("Incomplete configuration"
1057                                       " (missing cluster.rsahostkeypub)")
1058     self._config_data = data
1059     # reset the last serial as -1 so that the next write will cause
1060     # ssconf update
1061     self._last_cluster_serial = -1
1062
1063   def _DistributeConfig(self):
1064     """Distribute the configuration to the other nodes.
1065
1066     Currently, this only copies the configuration file. In the future,
1067     it could be used to encapsulate the 2/3-phase update mechanism.
1068
1069     """
1070     if self._offline:
1071       return True
1072     bad = False
1073
1074     node_list = []
1075     addr_list = []
1076     myhostname = self._my_hostname
1077     # we can skip checking whether _UnlockedGetNodeInfo returns None
1078     # since the node list comes from _UnlocketGetNodeList, and we are
1079     # called with the lock held, so no modifications should take place
1080     # in between
1081     for node_name in self._UnlockedGetNodeList():
1082       if node_name == myhostname:
1083         continue
1084       node_info = self._UnlockedGetNodeInfo(node_name)
1085       if not node_info.master_candidate:
1086         continue
1087       node_list.append(node_info.name)
1088       addr_list.append(node_info.primary_ip)
1089
1090     result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1091                                             address_list=addr_list)
1092     for to_node, to_result in result.items():
1093       msg = to_result.fail_msg
1094       if msg:
1095         msg = ("Copy of file %s to node %s failed: %s" %
1096                (self._cfg_file, to_node, msg))
1097         logging.error(msg)
1098         bad = True
1099     return not bad
1100
1101   def _WriteConfig(self, destination=None):
1102     """Write the configuration data to persistent storage.
1103
1104     """
1105     config_errors = self._UnlockedVerifyConfig()
1106     if config_errors:
1107       raise errors.ConfigurationError("Configuration data is not"
1108                                       " consistent: %s" %
1109                                       (", ".join(config_errors)))
1110     if destination is None:
1111       destination = self._cfg_file
1112     self._BumpSerialNo()
1113     txt = serializer.Dump(self._config_data.ToDict())
1114
1115     utils.WriteFile(destination, data=txt)
1116
1117     self.write_count += 1
1118
1119     # and redistribute the config file to master candidates
1120     self._DistributeConfig()
1121
1122     # Write ssconf files on all nodes (including locally)
1123     if self._last_cluster_serial < self._config_data.cluster.serial_no:
1124       if not self._offline:
1125         result = rpc.RpcRunner.call_write_ssconf_files(\
1126           self._UnlockedGetNodeList(),
1127           self._UnlockedGetSsconfValues())
1128         for nname, nresu in result.items():
1129           msg = nresu.fail_msg
1130           if msg:
1131             logging.warning("Error while uploading ssconf files to"
1132                             " node %s: %s", nname, msg)
1133       self._last_cluster_serial = self._config_data.cluster.serial_no
1134
1135   def _UnlockedGetSsconfValues(self):
1136     """Return the values needed by ssconf.
1137
1138     @rtype: dict
1139     @return: a dictionary with keys the ssconf names and values their
1140         associated value
1141
1142     """
1143     fn = "\n".join
1144     instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1145     node_names = utils.NiceSort(self._UnlockedGetNodeList())
1146     node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1147     node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1148                     for ninfo in node_info]
1149     node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1150                     for ninfo in node_info]
1151
1152     instance_data = fn(instance_names)
1153     off_data = fn(node.name for node in node_info if node.offline)
1154     on_data = fn(node.name for node in node_info if not node.offline)
1155     mc_data = fn(node.name for node in node_info if node.master_candidate)
1156     mc_ips_data = fn(node.primary_ip for node in node_info
1157                      if node.master_candidate)
1158     node_data = fn(node_names)
1159     node_pri_ips_data = fn(node_pri_ips)
1160     node_snd_ips_data = fn(node_snd_ips)
1161
1162     cluster = self._config_data.cluster
1163     cluster_tags = fn(cluster.GetTags())
1164     return {
1165       constants.SS_CLUSTER_NAME: cluster.cluster_name,
1166       constants.SS_CLUSTER_TAGS: cluster_tags,
1167       constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1168       constants.SS_MASTER_CANDIDATES: mc_data,
1169       constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1170       constants.SS_MASTER_IP: cluster.master_ip,
1171       constants.SS_MASTER_NETDEV: cluster.master_netdev,
1172       constants.SS_MASTER_NODE: cluster.master_node,
1173       constants.SS_NODE_LIST: node_data,
1174       constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1175       constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1176       constants.SS_OFFLINE_NODES: off_data,
1177       constants.SS_ONLINE_NODES: on_data,
1178       constants.SS_INSTANCE_LIST: instance_data,
1179       constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1180       }
1181
1182   @locking.ssynchronized(_config_lock, shared=1)
1183   def GetVGName(self):
1184     """Return the volume group name.
1185
1186     """
1187     return self._config_data.cluster.volume_group_name
1188
1189   @locking.ssynchronized(_config_lock)
1190   def SetVGName(self, vg_name):
1191     """Set the volume group name.
1192
1193     """
1194     self._config_data.cluster.volume_group_name = vg_name
1195     self._config_data.cluster.serial_no += 1
1196     self._WriteConfig()
1197
1198   @locking.ssynchronized(_config_lock, shared=1)
1199   def GetMACPrefix(self):
1200     """Return the mac prefix.
1201
1202     """
1203     return self._config_data.cluster.mac_prefix
1204
1205   @locking.ssynchronized(_config_lock, shared=1)
1206   def GetClusterInfo(self):
1207     """Returns information about the cluster
1208
1209     @rtype: L{objects.Cluster}
1210     @return: the cluster object
1211
1212     """
1213     return self._config_data.cluster
1214
1215   @locking.ssynchronized(_config_lock)
1216   def Update(self, target):
1217     """Notify function to be called after updates.
1218
1219     This function must be called when an object (as returned by
1220     GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1221     caller wants the modifications saved to the backing store. Note
1222     that all modified objects will be saved, but the target argument
1223     is the one the caller wants to ensure that it's saved.
1224
1225     @param target: an instance of either L{objects.Cluster},
1226         L{objects.Node} or L{objects.Instance} which is existing in
1227         the cluster
1228
1229     """
1230     if self._config_data is None:
1231       raise errors.ProgrammerError("Configuration file not read,"
1232                                    " cannot save.")
1233     update_serial = False
1234     if isinstance(target, objects.Cluster):
1235       test = target == self._config_data.cluster
1236     elif isinstance(target, objects.Node):
1237       test = target in self._config_data.nodes.values()
1238       update_serial = True
1239     elif isinstance(target, objects.Instance):
1240       test = target in self._config_data.instances.values()
1241     else:
1242       raise errors.ProgrammerError("Invalid object type (%s) passed to"
1243                                    " ConfigWriter.Update" % type(target))
1244     if not test:
1245       raise errors.ConfigurationError("Configuration updated since object"
1246                                       " has been read or unknown object")
1247     target.serial_no += 1
1248     target.mtime = now = time.time()
1249
1250     if update_serial:
1251       # for node updates, we need to increase the cluster serial too
1252       self._config_data.cluster.serial_no += 1
1253       self._config_data.cluster.mtime = now
1254
1255     if isinstance(target, objects.Instance):
1256       self._UnlockedReleaseDRBDMinors(target.name)
1257       for nic in target.nics:
1258         self._temporary_macs.discard(nic.mac)
1259
1260     self._WriteConfig()