Separate the computation of all config IDs
[ganeti-local] / lib / config.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Configuration management for Ganeti
23
24 This module provides the interface to the Ganeti cluster configuration.
25
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
28
29 Currently, the data storage format is JSON. YAML was slow and consuming too
30 much memory.
31
32 """
33
34 import os
35 import tempfile
36 import random
37 import logging
38
39 from ganeti import errors
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import constants
43 from ganeti import rpc
44 from ganeti import objects
45 from ganeti import serializer
46
47
48 _config_lock = locking.SharedLock()
49
50
51 def _ValidateConfig(data):
52   """Verifies that a configuration objects looks valid.
53
54   This only verifies the version of the configuration.
55
56   @raise errors.ConfigurationError: if the version differs from what
57       we expect
58
59   """
60   if data.version != constants.CONFIG_VERSION:
61     raise errors.ConfigurationError("Cluster configuration version"
62                                     " mismatch, got %s instead of %s" %
63                                     (data.version,
64                                      constants.CONFIG_VERSION))
65
66
67 class ConfigWriter:
68   """The interface to the cluster configuration.
69
70   """
71   def __init__(self, cfg_file=None, offline=False):
72     self.write_count = 0
73     self._lock = _config_lock
74     self._config_data = None
75     self._offline = offline
76     if cfg_file is None:
77       self._cfg_file = constants.CLUSTER_CONF_FILE
78     else:
79       self._cfg_file = cfg_file
80     self._temporary_ids = set()
81     self._temporary_drbds = {}
82     self._temporary_macs = set()
83     # Note: in order to prevent errors when resolving our name in
84     # _DistributeConfig, we compute it here once and reuse it; it's
85     # better to raise an error before starting to modify the config
86     # file than after it was modified
87     self._my_hostname = utils.HostInfo().name
88     self._last_cluster_serial = -1
89     self._OpenConfig()
90
91   # this method needs to be static, so that we can call it on the class
92   @staticmethod
93   def IsCluster():
94     """Check if the cluster is configured.
95
96     """
97     return os.path.exists(constants.CLUSTER_CONF_FILE)
98
99   @locking.ssynchronized(_config_lock, shared=1)
100   def GenerateMAC(self):
101     """Generate a MAC for an instance.
102
103     This should check the current instances for duplicates.
104
105     """
106     prefix = self._config_data.cluster.mac_prefix
107     all_macs = self._AllMACs()
108     retries = 64
109     while retries > 0:
110       byte1 = random.randrange(0, 256)
111       byte2 = random.randrange(0, 256)
112       byte3 = random.randrange(0, 256)
113       mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
114       if mac not in all_macs and mac not in self._temporary_macs:
115         break
116       retries -= 1
117     else:
118       raise errors.ConfigurationError("Can't generate unique MAC")
119     self._temporary_macs.add(mac)
120     return mac
121
122   @locking.ssynchronized(_config_lock, shared=1)
123   def IsMacInUse(self, mac):
124     """Predicate: check if the specified MAC is in use in the Ganeti cluster.
125
126     This only checks instances managed by this cluster, it does not
127     check for potential collisions elsewhere.
128
129     """
130     all_macs = self._AllMACs()
131     return mac in all_macs or mac in self._temporary_macs
132
133   @locking.ssynchronized(_config_lock, shared=1)
134   def GenerateDRBDSecret(self):
135     """Generate a DRBD secret.
136
137     This checks the current disks for duplicates.
138
139     """
140     all_secrets = self._AllDRBDSecrets()
141     retries = 64
142     while retries > 0:
143       secret = utils.GenerateSecret()
144       if secret not in all_secrets:
145         break
146       retries -= 1
147     else:
148       raise errors.ConfigurationError("Can't generate unique DRBD secret")
149     return secret
150
151   def _AllLVs(self):
152     """Compute the list of all LVs.
153
154     """
155     lvnames = set()
156     for instance in self._config_data.instances.values():
157       node_data = instance.MapLVsByNode()
158       for lv_list in node_data.values():
159         lvnames.update(lv_list)
160     return lvnames
161
162   def _AllIDs(self, include_temporary):
163     """Compute the list of all UUIDs and names we have.
164
165     @type include_temporary: boolean
166     @param include_temporary: whether to include the _temporary_ids set
167     @rtype: set
168     @return: a set of IDs
169
170     """
171     existing = set()
172     if include_temporary:
173       existing.update(self._temporary_ids)
174     existing.update(self._AllLVs())
175     existing.update(self._config_data.instances.keys())
176     existing.update(self._config_data.nodes.keys())
177     return existing
178
179   @locking.ssynchronized(_config_lock, shared=1)
180   def GenerateUniqueID(self, exceptions=None):
181     """Generate an unique disk name.
182
183     This checks the current node, instances and disk names for
184     duplicates.
185
186     @param exceptions: a list with some other names which should be checked
187         for uniqueness (used for example when you want to get
188         more than one id at one time without adding each one in
189         turn to the config file)
190
191     @rtype: string
192     @return: the unique id
193
194     """
195     existing = self._AllIDs(include_temporary=True)
196     if exceptions is not None:
197       existing.update(exceptions)
198     retries = 64
199     while retries > 0:
200       unique_id = utils.NewUUID()
201       if unique_id not in existing and unique_id is not None:
202         break
203     else:
204       raise errors.ConfigurationError("Not able generate an unique ID"
205                                       " (last tried ID: %s" % unique_id)
206     self._temporary_ids.add(unique_id)
207     return unique_id
208
209   def _AllMACs(self):
210     """Return all MACs present in the config.
211
212     @rtype: list
213     @return: the list of all MACs
214
215     """
216     result = []
217     for instance in self._config_data.instances.values():
218       for nic in instance.nics:
219         result.append(nic.mac)
220
221     return result
222
223   def _AllDRBDSecrets(self):
224     """Return all DRBD secrets present in the config.
225
226     @rtype: list
227     @return: the list of all DRBD secrets
228
229     """
230     def helper(disk, result):
231       """Recursively gather secrets from this disk."""
232       if disk.dev_type == constants.DT_DRBD8:
233         result.append(disk.logical_id[5])
234       if disk.children:
235         for child in disk.children:
236           helper(child, result)
237
238     result = []
239     for instance in self._config_data.instances.values():
240       for disk in instance.disks:
241         helper(disk, result)
242
243     return result
244
245   def _CheckDiskIDs(self, disk, l_ids, p_ids):
246     """Compute duplicate disk IDs
247
248     @type disk: L{objects.Disk}
249     @param disk: the disk at which to start searching
250     @type l_ids: list
251     @param l_ids: list of current logical ids
252     @type p_ids: list
253     @param p_ids: list of current physical ids
254     @rtype: list
255     @return: a list of error messages
256
257     """
258     result = []
259     if disk.logical_id is not None:
260       if disk.logical_id in l_ids:
261         result.append("duplicate logical id %s" % str(disk.logical_id))
262       else:
263         l_ids.append(disk.logical_id)
264     if disk.physical_id is not None:
265       if disk.physical_id in p_ids:
266         result.append("duplicate physical id %s" % str(disk.physical_id))
267       else:
268         p_ids.append(disk.physical_id)
269
270     if disk.children:
271       for child in disk.children:
272         result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
273     return result
274
275   def _UnlockedVerifyConfig(self):
276     """Verify function.
277
278     @rtype: list
279     @return: a list of error messages; a non-empty list signifies
280         configuration errors
281
282     """
283     result = []
284     seen_macs = []
285     ports = {}
286     data = self._config_data
287     seen_lids = []
288     seen_pids = []
289
290     # global cluster checks
291     if not data.cluster.enabled_hypervisors:
292       result.append("enabled hypervisors list doesn't have any entries")
293     invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
294     if invalid_hvs:
295       result.append("enabled hypervisors contains invalid entries: %s" %
296                     invalid_hvs)
297
298     if data.cluster.master_node not in data.nodes:
299       result.append("cluster has invalid primary node '%s'" %
300                     data.cluster.master_node)
301
302     # per-instance checks
303     for instance_name in data.instances:
304       instance = data.instances[instance_name]
305       if instance.primary_node not in data.nodes:
306         result.append("instance '%s' has invalid primary node '%s'" %
307                       (instance_name, instance.primary_node))
308       for snode in instance.secondary_nodes:
309         if snode not in data.nodes:
310           result.append("instance '%s' has invalid secondary node '%s'" %
311                         (instance_name, snode))
312       for idx, nic in enumerate(instance.nics):
313         if nic.mac in seen_macs:
314           result.append("instance '%s' has NIC %d mac %s duplicate" %
315                         (instance_name, idx, nic.mac))
316         else:
317           seen_macs.append(nic.mac)
318
319       # gather the drbd ports for duplicate checks
320       for dsk in instance.disks:
321         if dsk.dev_type in constants.LDS_DRBD:
322           tcp_port = dsk.logical_id[2]
323           if tcp_port not in ports:
324             ports[tcp_port] = []
325           ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
326       # gather network port reservation
327       net_port = getattr(instance, "network_port", None)
328       if net_port is not None:
329         if net_port not in ports:
330           ports[net_port] = []
331         ports[net_port].append((instance.name, "network port"))
332
333       # instance disk verify
334       for idx, disk in enumerate(instance.disks):
335         result.extend(["instance '%s' disk %d error: %s" %
336                        (instance.name, idx, msg) for msg in disk.Verify()])
337         result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
338
339     # cluster-wide pool of free ports
340     for free_port in data.cluster.tcpudp_port_pool:
341       if free_port not in ports:
342         ports[free_port] = []
343       ports[free_port].append(("cluster", "port marked as free"))
344
345     # compute tcp/udp duplicate ports
346     keys = ports.keys()
347     keys.sort()
348     for pnum in keys:
349       pdata = ports[pnum]
350       if len(pdata) > 1:
351         txt = ", ".join(["%s/%s" % val for val in pdata])
352         result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
353
354     # highest used tcp port check
355     if keys:
356       if keys[-1] > data.cluster.highest_used_port:
357         result.append("Highest used port mismatch, saved %s, computed %s" %
358                       (data.cluster.highest_used_port, keys[-1]))
359
360     if not data.nodes[data.cluster.master_node].master_candidate:
361       result.append("Master node is not a master candidate")
362
363     # master candidate checks
364     mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
365     if mc_now < mc_max:
366       result.append("Not enough master candidates: actual %d, target %d" %
367                     (mc_now, mc_max))
368
369     # node checks
370     for node in data.nodes.values():
371       if [node.master_candidate, node.drained, node.offline].count(True) > 1:
372         result.append("Node %s state is invalid: master_candidate=%s,"
373                       " drain=%s, offline=%s" %
374                       (node.name, node.master_candidate, node.drain,
375                        node.offline))
376
377     # drbd minors check
378     d_map, duplicates = self._UnlockedComputeDRBDMap()
379     for node, minor, instance_a, instance_b in duplicates:
380       result.append("DRBD minor %d on node %s is assigned twice to instances"
381                     " %s and %s" % (minor, node, instance_a, instance_b))
382
383     return result
384
385   @locking.ssynchronized(_config_lock, shared=1)
386   def VerifyConfig(self):
387     """Verify function.
388
389     This is just a wrapper over L{_UnlockedVerifyConfig}.
390
391     @rtype: list
392     @return: a list of error messages; a non-empty list signifies
393         configuration errors
394
395     """
396     return self._UnlockedVerifyConfig()
397
398   def _UnlockedSetDiskID(self, disk, node_name):
399     """Convert the unique ID to the ID needed on the target nodes.
400
401     This is used only for drbd, which needs ip/port configuration.
402
403     The routine descends down and updates its children also, because
404     this helps when the only the top device is passed to the remote
405     node.
406
407     This function is for internal use, when the config lock is already held.
408
409     """
410     if disk.children:
411       for child in disk.children:
412         self._UnlockedSetDiskID(child, node_name)
413
414     if disk.logical_id is None and disk.physical_id is not None:
415       return
416     if disk.dev_type == constants.LD_DRBD8:
417       pnode, snode, port, pminor, sminor, secret = disk.logical_id
418       if node_name not in (pnode, snode):
419         raise errors.ConfigurationError("DRBD device not knowing node %s" %
420                                         node_name)
421       pnode_info = self._UnlockedGetNodeInfo(pnode)
422       snode_info = self._UnlockedGetNodeInfo(snode)
423       if pnode_info is None or snode_info is None:
424         raise errors.ConfigurationError("Can't find primary or secondary node"
425                                         " for %s" % str(disk))
426       p_data = (pnode_info.secondary_ip, port)
427       s_data = (snode_info.secondary_ip, port)
428       if pnode == node_name:
429         disk.physical_id = p_data + s_data + (pminor, secret)
430       else: # it must be secondary, we tested above
431         disk.physical_id = s_data + p_data + (sminor, secret)
432     else:
433       disk.physical_id = disk.logical_id
434     return
435
436   @locking.ssynchronized(_config_lock)
437   def SetDiskID(self, disk, node_name):
438     """Convert the unique ID to the ID needed on the target nodes.
439
440     This is used only for drbd, which needs ip/port configuration.
441
442     The routine descends down and updates its children also, because
443     this helps when the only the top device is passed to the remote
444     node.
445
446     """
447     return self._UnlockedSetDiskID(disk, node_name)
448
449   @locking.ssynchronized(_config_lock)
450   def AddTcpUdpPort(self, port):
451     """Adds a new port to the available port pool.
452
453     """
454     if not isinstance(port, int):
455       raise errors.ProgrammerError("Invalid type passed for port")
456
457     self._config_data.cluster.tcpudp_port_pool.add(port)
458     self._WriteConfig()
459
460   @locking.ssynchronized(_config_lock, shared=1)
461   def GetPortList(self):
462     """Returns a copy of the current port list.
463
464     """
465     return self._config_data.cluster.tcpudp_port_pool.copy()
466
467   @locking.ssynchronized(_config_lock)
468   def AllocatePort(self):
469     """Allocate a port.
470
471     The port will be taken from the available port pool or from the
472     default port range (and in this case we increase
473     highest_used_port).
474
475     """
476     # If there are TCP/IP ports configured, we use them first.
477     if self._config_data.cluster.tcpudp_port_pool:
478       port = self._config_data.cluster.tcpudp_port_pool.pop()
479     else:
480       port = self._config_data.cluster.highest_used_port + 1
481       if port >= constants.LAST_DRBD_PORT:
482         raise errors.ConfigurationError("The highest used port is greater"
483                                         " than %s. Aborting." %
484                                         constants.LAST_DRBD_PORT)
485       self._config_data.cluster.highest_used_port = port
486
487     self._WriteConfig()
488     return port
489
490   def _UnlockedComputeDRBDMap(self):
491     """Compute the used DRBD minor/nodes.
492
493     @rtype: (dict, list)
494     @return: dictionary of node_name: dict of minor: instance_name;
495         the returned dict will have all the nodes in it (even if with
496         an empty list), and a list of duplicates; if the duplicates
497         list is not empty, the configuration is corrupted and its caller
498         should raise an exception
499
500     """
501     def _AppendUsedPorts(instance_name, disk, used):
502       duplicates = []
503       if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
504         node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
505         for node, port in ((node_a, minor_a), (node_b, minor_b)):
506           assert node in used, ("Node '%s' of instance '%s' not found"
507                                 " in node list" % (node, instance_name))
508           if port in used[node]:
509             duplicates.append((node, port, instance_name, used[node][port]))
510           else:
511             used[node][port] = instance_name
512       if disk.children:
513         for child in disk.children:
514           duplicates.extend(_AppendUsedPorts(instance_name, child, used))
515       return duplicates
516
517     duplicates = []
518     my_dict = dict((node, {}) for node in self._config_data.nodes)
519     for instance in self._config_data.instances.itervalues():
520       for disk in instance.disks:
521         duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
522     for (node, minor), instance in self._temporary_drbds.iteritems():
523       if minor in my_dict[node] and my_dict[node][minor] != instance:
524         duplicates.append((node, minor, instance, my_dict[node][minor]))
525       else:
526         my_dict[node][minor] = instance
527     return my_dict, duplicates
528
529   @locking.ssynchronized(_config_lock)
530   def ComputeDRBDMap(self):
531     """Compute the used DRBD minor/nodes.
532
533     This is just a wrapper over L{_UnlockedComputeDRBDMap}.
534
535     @return: dictionary of node_name: dict of minor: instance_name;
536         the returned dict will have all the nodes in it (even if with
537         an empty list).
538
539     """
540     d_map, duplicates = self._UnlockedComputeDRBDMap()
541     if duplicates:
542       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
543                                       str(duplicates))
544     return d_map
545
546   @locking.ssynchronized(_config_lock)
547   def AllocateDRBDMinor(self, nodes, instance):
548     """Allocate a drbd minor.
549
550     The free minor will be automatically computed from the existing
551     devices. A node can be given multiple times in order to allocate
552     multiple minors. The result is the list of minors, in the same
553     order as the passed nodes.
554
555     @type instance: string
556     @param instance: the instance for which we allocate minors
557
558     """
559     assert isinstance(instance, basestring), \
560            "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
561
562     d_map, duplicates = self._UnlockedComputeDRBDMap()
563     if duplicates:
564       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
565                                       str(duplicates))
566     result = []
567     for nname in nodes:
568       ndata = d_map[nname]
569       if not ndata:
570         # no minors used, we can start at 0
571         result.append(0)
572         ndata[0] = instance
573         self._temporary_drbds[(nname, 0)] = instance
574         continue
575       keys = ndata.keys()
576       keys.sort()
577       ffree = utils.FirstFree(keys)
578       if ffree is None:
579         # return the next minor
580         # TODO: implement high-limit check
581         minor = keys[-1] + 1
582       else:
583         minor = ffree
584       # double-check minor against current instances
585       assert minor not in d_map[nname], \
586              ("Attempt to reuse allocated DRBD minor %d on node %s,"
587               " already allocated to instance %s" %
588               (minor, nname, d_map[nname][minor]))
589       ndata[minor] = instance
590       # double-check minor against reservation
591       r_key = (nname, minor)
592       assert r_key not in self._temporary_drbds, \
593              ("Attempt to reuse reserved DRBD minor %d on node %s,"
594               " reserved for instance %s" %
595               (minor, nname, self._temporary_drbds[r_key]))
596       self._temporary_drbds[r_key] = instance
597       result.append(minor)
598     logging.debug("Request to allocate drbd minors, input: %s, returning %s",
599                   nodes, result)
600     return result
601
602   def _UnlockedReleaseDRBDMinors(self, instance):
603     """Release temporary drbd minors allocated for a given instance.
604
605     @type instance: string
606     @param instance: the instance for which temporary minors should be
607                      released
608
609     """
610     assert isinstance(instance, basestring), \
611            "Invalid argument passed to ReleaseDRBDMinors"
612     for key, name in self._temporary_drbds.items():
613       if name == instance:
614         del self._temporary_drbds[key]
615
616   @locking.ssynchronized(_config_lock)
617   def ReleaseDRBDMinors(self, instance):
618     """Release temporary drbd minors allocated for a given instance.
619
620     This should be called on the error paths, on the success paths
621     it's automatically called by the ConfigWriter add and update
622     functions.
623
624     This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
625
626     @type instance: string
627     @param instance: the instance for which temporary minors should be
628                      released
629
630     """
631     self._UnlockedReleaseDRBDMinors(instance)
632
633   @locking.ssynchronized(_config_lock, shared=1)
634   def GetConfigVersion(self):
635     """Get the configuration version.
636
637     @return: Config version
638
639     """
640     return self._config_data.version
641
642   @locking.ssynchronized(_config_lock, shared=1)
643   def GetClusterName(self):
644     """Get cluster name.
645
646     @return: Cluster name
647
648     """
649     return self._config_data.cluster.cluster_name
650
651   @locking.ssynchronized(_config_lock, shared=1)
652   def GetMasterNode(self):
653     """Get the hostname of the master node for this cluster.
654
655     @return: Master hostname
656
657     """
658     return self._config_data.cluster.master_node
659
660   @locking.ssynchronized(_config_lock, shared=1)
661   def GetMasterIP(self):
662     """Get the IP of the master node for this cluster.
663
664     @return: Master IP
665
666     """
667     return self._config_data.cluster.master_ip
668
669   @locking.ssynchronized(_config_lock, shared=1)
670   def GetMasterNetdev(self):
671     """Get the master network device for this cluster.
672
673     """
674     return self._config_data.cluster.master_netdev
675
676   @locking.ssynchronized(_config_lock, shared=1)
677   def GetFileStorageDir(self):
678     """Get the file storage dir for this cluster.
679
680     """
681     return self._config_data.cluster.file_storage_dir
682
683   @locking.ssynchronized(_config_lock, shared=1)
684   def GetHypervisorType(self):
685     """Get the hypervisor type for this cluster.
686
687     """
688     return self._config_data.cluster.default_hypervisor
689
690   @locking.ssynchronized(_config_lock, shared=1)
691   def GetHostKey(self):
692     """Return the rsa hostkey from the config.
693
694     @rtype: string
695     @return: the rsa hostkey
696
697     """
698     return self._config_data.cluster.rsahostkeypub
699
700   @locking.ssynchronized(_config_lock)
701   def AddInstance(self, instance):
702     """Add an instance to the config.
703
704     This should be used after creating a new instance.
705
706     @type instance: L{objects.Instance}
707     @param instance: the instance object
708
709     """
710     if not isinstance(instance, objects.Instance):
711       raise errors.ProgrammerError("Invalid type passed to AddInstance")
712
713     if instance.disk_template != constants.DT_DISKLESS:
714       all_lvs = instance.MapLVsByNode()
715       logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
716
717     all_macs = self._AllMACs()
718     for nic in instance.nics:
719       if nic.mac in all_macs:
720         raise errors.ConfigurationError("Cannot add instance %s:"
721           " MAC address '%s' already in use." % (instance.name, nic.mac))
722
723     instance.serial_no = 1
724     self._config_data.instances[instance.name] = instance
725     self._config_data.cluster.serial_no += 1
726     self._UnlockedReleaseDRBDMinors(instance.name)
727     for nic in instance.nics:
728       self._temporary_macs.discard(nic.mac)
729     self._WriteConfig()
730
731   def _SetInstanceStatus(self, instance_name, status):
732     """Set the instance's status to a given value.
733
734     """
735     assert isinstance(status, bool), \
736            "Invalid status '%s' passed to SetInstanceStatus" % (status,)
737
738     if instance_name not in self._config_data.instances:
739       raise errors.ConfigurationError("Unknown instance '%s'" %
740                                       instance_name)
741     instance = self._config_data.instances[instance_name]
742     if instance.admin_up != status:
743       instance.admin_up = status
744       instance.serial_no += 1
745       self._WriteConfig()
746
747   @locking.ssynchronized(_config_lock)
748   def MarkInstanceUp(self, instance_name):
749     """Mark the instance status to up in the config.
750
751     """
752     self._SetInstanceStatus(instance_name, True)
753
754   @locking.ssynchronized(_config_lock)
755   def RemoveInstance(self, instance_name):
756     """Remove the instance from the configuration.
757
758     """
759     if instance_name not in self._config_data.instances:
760       raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
761     del self._config_data.instances[instance_name]
762     self._config_data.cluster.serial_no += 1
763     self._WriteConfig()
764
765   @locking.ssynchronized(_config_lock)
766   def RenameInstance(self, old_name, new_name):
767     """Rename an instance.
768
769     This needs to be done in ConfigWriter and not by RemoveInstance
770     combined with AddInstance as only we can guarantee an atomic
771     rename.
772
773     """
774     if old_name not in self._config_data.instances:
775       raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
776     inst = self._config_data.instances[old_name]
777     del self._config_data.instances[old_name]
778     inst.name = new_name
779
780     for disk in inst.disks:
781       if disk.dev_type == constants.LD_FILE:
782         # rename the file paths in logical and physical id
783         file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
784         disk.physical_id = disk.logical_id = (disk.logical_id[0],
785                                               os.path.join(file_storage_dir,
786                                                            inst.name,
787                                                            disk.iv_name))
788
789     self._config_data.instances[inst.name] = inst
790     self._WriteConfig()
791
792   @locking.ssynchronized(_config_lock)
793   def MarkInstanceDown(self, instance_name):
794     """Mark the status of an instance to down in the configuration.
795
796     """
797     self._SetInstanceStatus(instance_name, False)
798
799   def _UnlockedGetInstanceList(self):
800     """Get the list of instances.
801
802     This function is for internal use, when the config lock is already held.
803
804     """
805     return self._config_data.instances.keys()
806
807   @locking.ssynchronized(_config_lock, shared=1)
808   def GetInstanceList(self):
809     """Get the list of instances.
810
811     @return: array of instances, ex. ['instance2.example.com',
812         'instance1.example.com']
813
814     """
815     return self._UnlockedGetInstanceList()
816
817   @locking.ssynchronized(_config_lock, shared=1)
818   def ExpandInstanceName(self, short_name):
819     """Attempt to expand an incomplete instance name.
820
821     """
822     return utils.MatchNameComponent(short_name,
823                                     self._config_data.instances.keys())
824
825   def _UnlockedGetInstanceInfo(self, instance_name):
826     """Returns information about an instance.
827
828     This function is for internal use, when the config lock is already held.
829
830     """
831     if instance_name not in self._config_data.instances:
832       return None
833
834     return self._config_data.instances[instance_name]
835
836   @locking.ssynchronized(_config_lock, shared=1)
837   def GetInstanceInfo(self, instance_name):
838     """Returns information about an instance.
839
840     It takes the information from the configuration file. Other information of
841     an instance are taken from the live systems.
842
843     @param instance_name: name of the instance, e.g.
844         I{instance1.example.com}
845
846     @rtype: L{objects.Instance}
847     @return: the instance object
848
849     """
850     return self._UnlockedGetInstanceInfo(instance_name)
851
852   @locking.ssynchronized(_config_lock, shared=1)
853   def GetAllInstancesInfo(self):
854     """Get the configuration of all instances.
855
856     @rtype: dict
857     @return: dict of (instance, instance_info), where instance_info is what
858               would GetInstanceInfo return for the node
859
860     """
861     my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
862                     for instance in self._UnlockedGetInstanceList()])
863     return my_dict
864
865   @locking.ssynchronized(_config_lock)
866   def AddNode(self, node):
867     """Add a node to the configuration.
868
869     @type node: L{objects.Node}
870     @param node: a Node instance
871
872     """
873     logging.info("Adding node %s to configuration" % node.name)
874
875     node.serial_no = 1
876     self._config_data.nodes[node.name] = node
877     self._config_data.cluster.serial_no += 1
878     self._WriteConfig()
879
880   @locking.ssynchronized(_config_lock)
881   def RemoveNode(self, node_name):
882     """Remove a node from the configuration.
883
884     """
885     logging.info("Removing node %s from configuration" % node_name)
886
887     if node_name not in self._config_data.nodes:
888       raise errors.ConfigurationError("Unknown node '%s'" % node_name)
889
890     del self._config_data.nodes[node_name]
891     self._config_data.cluster.serial_no += 1
892     self._WriteConfig()
893
894   @locking.ssynchronized(_config_lock, shared=1)
895   def ExpandNodeName(self, short_name):
896     """Attempt to expand an incomplete instance name.
897
898     """
899     return utils.MatchNameComponent(short_name,
900                                     self._config_data.nodes.keys())
901
902   def _UnlockedGetNodeInfo(self, node_name):
903     """Get the configuration of a node, as stored in the config.
904
905     This function is for internal use, when the config lock is already
906     held.
907
908     @param node_name: the node name, e.g. I{node1.example.com}
909
910     @rtype: L{objects.Node}
911     @return: the node object
912
913     """
914     if node_name not in self._config_data.nodes:
915       return None
916
917     return self._config_data.nodes[node_name]
918
919
920   @locking.ssynchronized(_config_lock, shared=1)
921   def GetNodeInfo(self, node_name):
922     """Get the configuration of a node, as stored in the config.
923
924     This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
925
926     @param node_name: the node name, e.g. I{node1.example.com}
927
928     @rtype: L{objects.Node}
929     @return: the node object
930
931     """
932     return self._UnlockedGetNodeInfo(node_name)
933
934   def _UnlockedGetNodeList(self):
935     """Return the list of nodes which are in the configuration.
936
937     This function is for internal use, when the config lock is already
938     held.
939
940     @rtype: list
941
942     """
943     return self._config_data.nodes.keys()
944
945
946   @locking.ssynchronized(_config_lock, shared=1)
947   def GetNodeList(self):
948     """Return the list of nodes which are in the configuration.
949
950     """
951     return self._UnlockedGetNodeList()
952
953   @locking.ssynchronized(_config_lock, shared=1)
954   def GetOnlineNodeList(self):
955     """Return the list of nodes which are online.
956
957     """
958     all_nodes = [self._UnlockedGetNodeInfo(node)
959                  for node in self._UnlockedGetNodeList()]
960     return [node.name for node in all_nodes if not node.offline]
961
962   @locking.ssynchronized(_config_lock, shared=1)
963   def GetAllNodesInfo(self):
964     """Get the configuration of all nodes.
965
966     @rtype: dict
967     @return: dict of (node, node_info), where node_info is what
968               would GetNodeInfo return for the node
969
970     """
971     my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
972                     for node in self._UnlockedGetNodeList()])
973     return my_dict
974
975   def _UnlockedGetMasterCandidateStats(self, exceptions=None):
976     """Get the number of current and maximum desired and possible candidates.
977
978     @type exceptions: list
979     @param exceptions: if passed, list of nodes that should be ignored
980     @rtype: tuple
981     @return: tuple of (current, desired and possible)
982
983     """
984     mc_now = mc_max = 0
985     for node in self._config_data.nodes.values():
986       if exceptions and node.name in exceptions:
987         continue
988       if not (node.offline or node.drained):
989         mc_max += 1
990       if node.master_candidate:
991         mc_now += 1
992     mc_max = min(mc_max, self._config_data.cluster.candidate_pool_size)
993     return (mc_now, mc_max)
994
995   @locking.ssynchronized(_config_lock, shared=1)
996   def GetMasterCandidateStats(self, exceptions=None):
997     """Get the number of current and maximum possible candidates.
998
999     This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1000
1001     @type exceptions: list
1002     @param exceptions: if passed, list of nodes that should be ignored
1003     @rtype: tuple
1004     @return: tuple of (current, max)
1005
1006     """
1007     return self._UnlockedGetMasterCandidateStats(exceptions)
1008
1009   @locking.ssynchronized(_config_lock)
1010   def MaintainCandidatePool(self):
1011     """Try to grow the candidate pool to the desired size.
1012
1013     @rtype: list
1014     @return: list with the adjusted nodes (L{objects.Node} instances)
1015
1016     """
1017     mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
1018     mod_list = []
1019     if mc_now < mc_max:
1020       node_list = self._config_data.nodes.keys()
1021       random.shuffle(node_list)
1022       for name in node_list:
1023         if mc_now >= mc_max:
1024           break
1025         node = self._config_data.nodes[name]
1026         if node.master_candidate or node.offline or node.drained:
1027           continue
1028         mod_list.append(node)
1029         node.master_candidate = True
1030         node.serial_no += 1
1031         mc_now += 1
1032       if mc_now != mc_max:
1033         # this should not happen
1034         logging.warning("Warning: MaintainCandidatePool didn't manage to"
1035                         " fill the candidate pool (%d/%d)", mc_now, mc_max)
1036       if mod_list:
1037         self._config_data.cluster.serial_no += 1
1038         self._WriteConfig()
1039
1040     return mod_list
1041
1042   def _BumpSerialNo(self):
1043     """Bump up the serial number of the config.
1044
1045     """
1046     self._config_data.serial_no += 1
1047
1048   def _OpenConfig(self):
1049     """Read the config data from disk.
1050
1051     """
1052     f = open(self._cfg_file, 'r')
1053     try:
1054       try:
1055         data = objects.ConfigData.FromDict(serializer.Load(f.read()))
1056       except Exception, err:
1057         raise errors.ConfigurationError(err)
1058     finally:
1059       f.close()
1060
1061     # Make sure the configuration has the right version
1062     _ValidateConfig(data)
1063
1064     if (not hasattr(data, 'cluster') or
1065         not hasattr(data.cluster, 'rsahostkeypub')):
1066       raise errors.ConfigurationError("Incomplete configuration"
1067                                       " (missing cluster.rsahostkeypub)")
1068
1069     # Upgrade configuration if needed
1070     data.UpgradeConfig()
1071
1072     self._config_data = data
1073     # reset the last serial as -1 so that the next write will cause
1074     # ssconf update
1075     self._last_cluster_serial = -1
1076
1077   def _DistributeConfig(self):
1078     """Distribute the configuration to the other nodes.
1079
1080     Currently, this only copies the configuration file. In the future,
1081     it could be used to encapsulate the 2/3-phase update mechanism.
1082
1083     """
1084     if self._offline:
1085       return True
1086     bad = False
1087
1088     node_list = []
1089     addr_list = []
1090     myhostname = self._my_hostname
1091     # we can skip checking whether _UnlockedGetNodeInfo returns None
1092     # since the node list comes from _UnlocketGetNodeList, and we are
1093     # called with the lock held, so no modifications should take place
1094     # in between
1095     for node_name in self._UnlockedGetNodeList():
1096       if node_name == myhostname:
1097         continue
1098       node_info = self._UnlockedGetNodeInfo(node_name)
1099       if not node_info.master_candidate:
1100         continue
1101       node_list.append(node_info.name)
1102       addr_list.append(node_info.primary_ip)
1103
1104     result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1105                                             address_list=addr_list)
1106     for node in node_list:
1107       if not result[node]:
1108         logging.error("copy of file %s to node %s failed",
1109                       self._cfg_file, node)
1110         bad = True
1111     return not bad
1112
1113   def _WriteConfig(self, destination=None):
1114     """Write the configuration data to persistent storage.
1115
1116     """
1117     config_errors = self._UnlockedVerifyConfig()
1118     if config_errors:
1119       raise errors.ConfigurationError("Configuration data is not"
1120                                       " consistent: %s" %
1121                                       (", ".join(config_errors)))
1122     if destination is None:
1123       destination = self._cfg_file
1124     self._BumpSerialNo()
1125     txt = serializer.Dump(self._config_data.ToDict())
1126     dir_name, file_name = os.path.split(destination)
1127     fd, name = tempfile.mkstemp('.newconfig', file_name, dir_name)
1128     f = os.fdopen(fd, 'w')
1129     try:
1130       f.write(txt)
1131       os.fsync(f.fileno())
1132     finally:
1133       f.close()
1134     # we don't need to do os.close(fd) as f.close() did it
1135     os.rename(name, destination)
1136     self.write_count += 1
1137
1138     # and redistribute the config file to master candidates
1139     self._DistributeConfig()
1140
1141     # Write ssconf files on all nodes (including locally)
1142     if self._last_cluster_serial < self._config_data.cluster.serial_no:
1143       if not self._offline:
1144         rpc.RpcRunner.call_write_ssconf_files(self._UnlockedGetNodeList(),
1145                                               self._UnlockedGetSsconfValues())
1146       self._last_cluster_serial = self._config_data.cluster.serial_no
1147
1148   def _UnlockedGetSsconfValues(self):
1149     """Return the values needed by ssconf.
1150
1151     @rtype: dict
1152     @return: a dictionary with keys the ssconf names and values their
1153         associated value
1154
1155     """
1156     fn = "\n".join
1157     instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1158     node_names = utils.NiceSort(self._UnlockedGetNodeList())
1159     node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1160
1161     instance_data = fn(instance_names)
1162     off_data = fn(node.name for node in node_info if node.offline)
1163     on_data = fn(node.name for node in node_info if not node.offline)
1164     mc_data = fn(node.name for node in node_info if node.master_candidate)
1165     node_data = fn(node_names)
1166
1167     cluster = self._config_data.cluster
1168     cluster_tags = fn(cluster.GetTags())
1169     return {
1170       constants.SS_CLUSTER_NAME: cluster.cluster_name,
1171       constants.SS_CLUSTER_TAGS: cluster_tags,
1172       constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1173       constants.SS_MASTER_CANDIDATES: mc_data,
1174       constants.SS_MASTER_IP: cluster.master_ip,
1175       constants.SS_MASTER_NETDEV: cluster.master_netdev,
1176       constants.SS_MASTER_NODE: cluster.master_node,
1177       constants.SS_NODE_LIST: node_data,
1178       constants.SS_OFFLINE_NODES: off_data,
1179       constants.SS_ONLINE_NODES: on_data,
1180       constants.SS_INSTANCE_LIST: instance_data,
1181       constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1182       }
1183
1184   @locking.ssynchronized(_config_lock, shared=1)
1185   def GetVGName(self):
1186     """Return the volume group name.
1187
1188     """
1189     return self._config_data.cluster.volume_group_name
1190
1191   @locking.ssynchronized(_config_lock)
1192   def SetVGName(self, vg_name):
1193     """Set the volume group name.
1194
1195     """
1196     self._config_data.cluster.volume_group_name = vg_name
1197     self._config_data.cluster.serial_no += 1
1198     self._WriteConfig()
1199
1200   @locking.ssynchronized(_config_lock, shared=1)
1201   def GetDefBridge(self):
1202     """Return the default bridge.
1203
1204     """
1205     return self._config_data.cluster.default_bridge
1206
1207   @locking.ssynchronized(_config_lock, shared=1)
1208   def GetMACPrefix(self):
1209     """Return the mac prefix.
1210
1211     """
1212     return self._config_data.cluster.mac_prefix
1213
1214   @locking.ssynchronized(_config_lock, shared=1)
1215   def GetClusterInfo(self):
1216     """Returns information about the cluster
1217
1218     @rtype: L{objects.Cluster}
1219     @return: the cluster object
1220
1221     """
1222     return self._config_data.cluster
1223
1224   @locking.ssynchronized(_config_lock)
1225   def Update(self, target):
1226     """Notify function to be called after updates.
1227
1228     This function must be called when an object (as returned by
1229     GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1230     caller wants the modifications saved to the backing store. Note
1231     that all modified objects will be saved, but the target argument
1232     is the one the caller wants to ensure that it's saved.
1233
1234     @param target: an instance of either L{objects.Cluster},
1235         L{objects.Node} or L{objects.Instance} which is existing in
1236         the cluster
1237
1238     """
1239     if self._config_data is None:
1240       raise errors.ProgrammerError("Configuration file not read,"
1241                                    " cannot save.")
1242     update_serial = False
1243     if isinstance(target, objects.Cluster):
1244       test = target == self._config_data.cluster
1245     elif isinstance(target, objects.Node):
1246       test = target in self._config_data.nodes.values()
1247       update_serial = True
1248     elif isinstance(target, objects.Instance):
1249       test = target in self._config_data.instances.values()
1250     else:
1251       raise errors.ProgrammerError("Invalid object type (%s) passed to"
1252                                    " ConfigWriter.Update" % type(target))
1253     if not test:
1254       raise errors.ConfigurationError("Configuration updated since object"
1255                                       " has been read or unknown object")
1256     target.serial_no += 1
1257
1258     if update_serial:
1259       # for node updates, we need to increase the cluster serial too
1260       self._config_data.cluster.serial_no += 1
1261
1262     if isinstance(target, objects.Instance):
1263       self._UnlockedReleaseDRBDMinors(target.name)
1264       for nic in target.nics:
1265         self._temporary_macs.discard(nic.mac)
1266
1267     self._WriteConfig()