Support arguments in utils.RunInSeparateProcess
[ganeti-local] / lib / config.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Configuration management for Ganeti
23
24 This module provides the interface to the Ganeti cluster configuration.
25
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
28
29 Currently, the data storage format is JSON. YAML was slow and consuming too
30 much memory.
31
32 """
33
34 import os
35 import random
36 import logging
37 import time
38
39 from ganeti import errors
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import constants
43 from ganeti import rpc
44 from ganeti import objects
45 from ganeti import serializer
46
47
48 _config_lock = locking.SharedLock()
49
50 # job id used for resource management at config upgrade time
51 _UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
52
53
54 def _ValidateConfig(data):
55   """Verifies that a configuration objects looks valid.
56
57   This only verifies the version of the configuration.
58
59   @raise errors.ConfigurationError: if the version differs from what
60       we expect
61
62   """
63   if data.version != constants.CONFIG_VERSION:
64     raise errors.ConfigurationError("Cluster configuration version"
65                                     " mismatch, got %s instead of %s" %
66                                     (data.version,
67                                      constants.CONFIG_VERSION))
68
69
70 class TemporaryReservationManager:
71   """A temporary resource reservation manager.
72
73   This is used to reserve resources in a job, before using them, making sure
74   other jobs cannot get them in the meantime.
75
76   """
77   def __init__(self):
78     self._ec_reserved = {}
79
80   def Reserved(self, resource):
81     for holder_reserved in self._ec_reserved.items():
82       if resource in holder_reserved:
83         return True
84     return False
85
86   def Reserve(self, ec_id, resource):
87     if self.Reserved(resource):
88       raise errors.ReservationError("Duplicate reservation for resource: %s." %
89                                     (resource))
90     if ec_id not in self._ec_reserved:
91       self._ec_reserved[ec_id] = set([resource])
92     else:
93       self._ec_reserved[ec_id].add(resource)
94
95   def DropECReservations(self, ec_id):
96     if ec_id in self._ec_reserved:
97       del self._ec_reserved[ec_id]
98
99   def GetReserved(self):
100     all_reserved = set()
101     for holder_reserved in self._ec_reserved.values():
102       all_reserved.update(holder_reserved)
103     return all_reserved
104
105   def Generate(self, existing, generate_one_fn, ec_id):
106     """Generate a new resource of this type
107
108     """
109     assert callable(generate_one_fn)
110
111     all_elems = self.GetReserved()
112     all_elems.update(existing)
113     retries = 64
114     while retries > 0:
115       new_resource = generate_one_fn()
116       if new_resource is not None and new_resource not in all_elems:
117         break
118     else:
119       raise errors.ConfigurationError("Not able generate new resource"
120                                       " (last tried: %s)" % new_resource)
121     self.Reserve(ec_id, new_resource)
122     return new_resource
123
124
125 class ConfigWriter:
126   """The interface to the cluster configuration.
127
128   """
129   def __init__(self, cfg_file=None, offline=False):
130     self.write_count = 0
131     self._lock = _config_lock
132     self._config_data = None
133     self._offline = offline
134     if cfg_file is None:
135       self._cfg_file = constants.CLUSTER_CONF_FILE
136     else:
137       self._cfg_file = cfg_file
138     self._temporary_ids = TemporaryReservationManager()
139     self._temporary_drbds = {}
140     self._temporary_macs = TemporaryReservationManager()
141     self._temporary_secrets = TemporaryReservationManager()
142     # Note: in order to prevent errors when resolving our name in
143     # _DistributeConfig, we compute it here once and reuse it; it's
144     # better to raise an error before starting to modify the config
145     # file than after it was modified
146     self._my_hostname = utils.HostInfo().name
147     self._last_cluster_serial = -1
148     self._OpenConfig()
149
150   # this method needs to be static, so that we can call it on the class
151   @staticmethod
152   def IsCluster():
153     """Check if the cluster is configured.
154
155     """
156     return os.path.exists(constants.CLUSTER_CONF_FILE)
157
158   def _GenerateOneMAC(self):
159     """Generate one mac address
160
161     """
162     prefix = self._config_data.cluster.mac_prefix
163     byte1 = random.randrange(0, 256)
164     byte2 = random.randrange(0, 256)
165     byte3 = random.randrange(0, 256)
166     mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
167     return mac
168
169   @locking.ssynchronized(_config_lock, shared=1)
170   def GenerateMAC(self, ec_id):
171     """Generate a MAC for an instance.
172
173     This should check the current instances for duplicates.
174
175     """
176     existing = self._AllMACs()
177     return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
178
179   @locking.ssynchronized(_config_lock, shared=1)
180   def ReserveMAC(self, mac, ec_id):
181     """Reserve a MAC for an instance.
182
183     This only checks instances managed by this cluster, it does not
184     check for potential collisions elsewhere.
185
186     """
187     all_macs = self._AllMACs()
188     if mac in all_macs:
189       raise errors.ReservationError("mac already in use")
190     else:
191       self._temporary_macs.Reserve(mac, ec_id)
192
193   @locking.ssynchronized(_config_lock, shared=1)
194   def GenerateDRBDSecret(self, ec_id):
195     """Generate a DRBD secret.
196
197     This checks the current disks for duplicates.
198
199     """
200     return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
201                                             utils.GenerateSecret,
202                                             ec_id)
203
204   def _AllLVs(self):
205     """Compute the list of all LVs.
206
207     """
208     lvnames = set()
209     for instance in self._config_data.instances.values():
210       node_data = instance.MapLVsByNode()
211       for lv_list in node_data.values():
212         lvnames.update(lv_list)
213     return lvnames
214
215   def _AllIDs(self, include_temporary):
216     """Compute the list of all UUIDs and names we have.
217
218     @type include_temporary: boolean
219     @param include_temporary: whether to include the _temporary_ids set
220     @rtype: set
221     @return: a set of IDs
222
223     """
224     existing = set()
225     if include_temporary:
226       existing.update(self._temporary_ids.GetReserved())
227     existing.update(self._AllLVs())
228     existing.update(self._config_data.instances.keys())
229     existing.update(self._config_data.nodes.keys())
230     existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
231     return existing
232
233   def _GenerateUniqueID(self, ec_id):
234     """Generate an unique UUID.
235
236     This checks the current node, instances and disk names for
237     duplicates.
238
239     @rtype: string
240     @return: the unique id
241
242     """
243     existing = self._AllIDs(include_temporary=False)
244     return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
245
246   @locking.ssynchronized(_config_lock, shared=1)
247   def GenerateUniqueID(self, ec_id):
248     """Generate an unique ID.
249
250     This is just a wrapper over the unlocked version.
251
252     @type ec_id: string
253     @param ec_id: unique id for the job to reserve the id to
254
255     """
256     return self._GenerateUniqueID(ec_id)
257
258   def _AllMACs(self):
259     """Return all MACs present in the config.
260
261     @rtype: list
262     @return: the list of all MACs
263
264     """
265     result = []
266     for instance in self._config_data.instances.values():
267       for nic in instance.nics:
268         result.append(nic.mac)
269
270     return result
271
272   def _AllDRBDSecrets(self):
273     """Return all DRBD secrets present in the config.
274
275     @rtype: list
276     @return: the list of all DRBD secrets
277
278     """
279     def helper(disk, result):
280       """Recursively gather secrets from this disk."""
281       if disk.dev_type == constants.DT_DRBD8:
282         result.append(disk.logical_id[5])
283       if disk.children:
284         for child in disk.children:
285           helper(child, result)
286
287     result = []
288     for instance in self._config_data.instances.values():
289       for disk in instance.disks:
290         helper(disk, result)
291
292     return result
293
294   def _CheckDiskIDs(self, disk, l_ids, p_ids):
295     """Compute duplicate disk IDs
296
297     @type disk: L{objects.Disk}
298     @param disk: the disk at which to start searching
299     @type l_ids: list
300     @param l_ids: list of current logical ids
301     @type p_ids: list
302     @param p_ids: list of current physical ids
303     @rtype: list
304     @return: a list of error messages
305
306     """
307     result = []
308     if disk.logical_id is not None:
309       if disk.logical_id in l_ids:
310         result.append("duplicate logical id %s" % str(disk.logical_id))
311       else:
312         l_ids.append(disk.logical_id)
313     if disk.physical_id is not None:
314       if disk.physical_id in p_ids:
315         result.append("duplicate physical id %s" % str(disk.physical_id))
316       else:
317         p_ids.append(disk.physical_id)
318
319     if disk.children:
320       for child in disk.children:
321         result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
322     return result
323
324   def _UnlockedVerifyConfig(self):
325     """Verify function.
326
327     @rtype: list
328     @return: a list of error messages; a non-empty list signifies
329         configuration errors
330
331     """
332     result = []
333     seen_macs = []
334     ports = {}
335     data = self._config_data
336     seen_lids = []
337     seen_pids = []
338
339     # global cluster checks
340     if not data.cluster.enabled_hypervisors:
341       result.append("enabled hypervisors list doesn't have any entries")
342     invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
343     if invalid_hvs:
344       result.append("enabled hypervisors contains invalid entries: %s" %
345                     invalid_hvs)
346
347     if data.cluster.master_node not in data.nodes:
348       result.append("cluster has invalid primary node '%s'" %
349                     data.cluster.master_node)
350
351     # per-instance checks
352     for instance_name in data.instances:
353       instance = data.instances[instance_name]
354       if instance.primary_node not in data.nodes:
355         result.append("instance '%s' has invalid primary node '%s'" %
356                       (instance_name, instance.primary_node))
357       for snode in instance.secondary_nodes:
358         if snode not in data.nodes:
359           result.append("instance '%s' has invalid secondary node '%s'" %
360                         (instance_name, snode))
361       for idx, nic in enumerate(instance.nics):
362         if nic.mac in seen_macs:
363           result.append("instance '%s' has NIC %d mac %s duplicate" %
364                         (instance_name, idx, nic.mac))
365         else:
366           seen_macs.append(nic.mac)
367
368       # gather the drbd ports for duplicate checks
369       for dsk in instance.disks:
370         if dsk.dev_type in constants.LDS_DRBD:
371           tcp_port = dsk.logical_id[2]
372           if tcp_port not in ports:
373             ports[tcp_port] = []
374           ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
375       # gather network port reservation
376       net_port = getattr(instance, "network_port", None)
377       if net_port is not None:
378         if net_port not in ports:
379           ports[net_port] = []
380         ports[net_port].append((instance.name, "network port"))
381
382       # instance disk verify
383       for idx, disk in enumerate(instance.disks):
384         result.extend(["instance '%s' disk %d error: %s" %
385                        (instance.name, idx, msg) for msg in disk.Verify()])
386         result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
387
388     # cluster-wide pool of free ports
389     for free_port in data.cluster.tcpudp_port_pool:
390       if free_port not in ports:
391         ports[free_port] = []
392       ports[free_port].append(("cluster", "port marked as free"))
393
394     # compute tcp/udp duplicate ports
395     keys = ports.keys()
396     keys.sort()
397     for pnum in keys:
398       pdata = ports[pnum]
399       if len(pdata) > 1:
400         txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
401         result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
402
403     # highest used tcp port check
404     if keys:
405       if keys[-1] > data.cluster.highest_used_port:
406         result.append("Highest used port mismatch, saved %s, computed %s" %
407                       (data.cluster.highest_used_port, keys[-1]))
408
409     if not data.nodes[data.cluster.master_node].master_candidate:
410       result.append("Master node is not a master candidate")
411
412     # master candidate checks
413     mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
414     if mc_now < mc_max:
415       result.append("Not enough master candidates: actual %d, target %d" %
416                     (mc_now, mc_max))
417
418     # node checks
419     for node in data.nodes.values():
420       if [node.master_candidate, node.drained, node.offline].count(True) > 1:
421         result.append("Node %s state is invalid: master_candidate=%s,"
422                       " drain=%s, offline=%s" %
423                       (node.name, node.master_candidate, node.drain,
424                        node.offline))
425
426     # drbd minors check
427     _, duplicates = self._UnlockedComputeDRBDMap()
428     for node, minor, instance_a, instance_b in duplicates:
429       result.append("DRBD minor %d on node %s is assigned twice to instances"
430                     " %s and %s" % (minor, node, instance_a, instance_b))
431
432     # IP checks
433     default_nicparams = data.cluster.nicparams[constants.PP_DEFAULT]
434     ips = {}
435
436     def _AddIpAddress(ip, name):
437       ips.setdefault(ip, []).append(name)
438
439     _AddIpAddress(data.cluster.master_ip, "cluster_ip")
440
441     for node in data.nodes.values():
442       _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
443       if node.secondary_ip != node.primary_ip:
444         _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
445
446     for instance in data.instances.values():
447       for idx, nic in enumerate(instance.nics):
448         if nic.ip is None:
449           continue
450
451         nicparams = objects.FillDict(default_nicparams, nic.nicparams)
452         nic_mode = nicparams[constants.NIC_MODE]
453         nic_link = nicparams[constants.NIC_LINK]
454
455         if nic_mode == constants.NIC_MODE_BRIDGED:
456           link = "bridge:%s" % nic_link
457         elif nic_mode == constants.NIC_MODE_ROUTED:
458           link = "route:%s" % nic_link
459         else:
460           raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
461
462         _AddIpAddress("%s/%s" % (link, nic.ip),
463                       "instance:%s/nic:%d" % (instance.name, idx))
464
465     for ip, owners in ips.items():
466       if len(owners) > 1:
467         result.append("IP address %s is used by multiple owners: %s" %
468                       (ip, utils.CommaJoin(owners)))
469
470     return result
471
472   @locking.ssynchronized(_config_lock, shared=1)
473   def VerifyConfig(self):
474     """Verify function.
475
476     This is just a wrapper over L{_UnlockedVerifyConfig}.
477
478     @rtype: list
479     @return: a list of error messages; a non-empty list signifies
480         configuration errors
481
482     """
483     return self._UnlockedVerifyConfig()
484
485   def _UnlockedSetDiskID(self, disk, node_name):
486     """Convert the unique ID to the ID needed on the target nodes.
487
488     This is used only for drbd, which needs ip/port configuration.
489
490     The routine descends down and updates its children also, because
491     this helps when the only the top device is passed to the remote
492     node.
493
494     This function is for internal use, when the config lock is already held.
495
496     """
497     if disk.children:
498       for child in disk.children:
499         self._UnlockedSetDiskID(child, node_name)
500
501     if disk.logical_id is None and disk.physical_id is not None:
502       return
503     if disk.dev_type == constants.LD_DRBD8:
504       pnode, snode, port, pminor, sminor, secret = disk.logical_id
505       if node_name not in (pnode, snode):
506         raise errors.ConfigurationError("DRBD device not knowing node %s" %
507                                         node_name)
508       pnode_info = self._UnlockedGetNodeInfo(pnode)
509       snode_info = self._UnlockedGetNodeInfo(snode)
510       if pnode_info is None or snode_info is None:
511         raise errors.ConfigurationError("Can't find primary or secondary node"
512                                         " for %s" % str(disk))
513       p_data = (pnode_info.secondary_ip, port)
514       s_data = (snode_info.secondary_ip, port)
515       if pnode == node_name:
516         disk.physical_id = p_data + s_data + (pminor, secret)
517       else: # it must be secondary, we tested above
518         disk.physical_id = s_data + p_data + (sminor, secret)
519     else:
520       disk.physical_id = disk.logical_id
521     return
522
523   @locking.ssynchronized(_config_lock)
524   def SetDiskID(self, disk, node_name):
525     """Convert the unique ID to the ID needed on the target nodes.
526
527     This is used only for drbd, which needs ip/port configuration.
528
529     The routine descends down and updates its children also, because
530     this helps when the only the top device is passed to the remote
531     node.
532
533     """
534     return self._UnlockedSetDiskID(disk, node_name)
535
536   @locking.ssynchronized(_config_lock)
537   def AddTcpUdpPort(self, port):
538     """Adds a new port to the available port pool.
539
540     """
541     if not isinstance(port, int):
542       raise errors.ProgrammerError("Invalid type passed for port")
543
544     self._config_data.cluster.tcpudp_port_pool.add(port)
545     self._WriteConfig()
546
547   @locking.ssynchronized(_config_lock, shared=1)
548   def GetPortList(self):
549     """Returns a copy of the current port list.
550
551     """
552     return self._config_data.cluster.tcpudp_port_pool.copy()
553
554   @locking.ssynchronized(_config_lock)
555   def AllocatePort(self):
556     """Allocate a port.
557
558     The port will be taken from the available port pool or from the
559     default port range (and in this case we increase
560     highest_used_port).
561
562     """
563     # If there are TCP/IP ports configured, we use them first.
564     if self._config_data.cluster.tcpudp_port_pool:
565       port = self._config_data.cluster.tcpudp_port_pool.pop()
566     else:
567       port = self._config_data.cluster.highest_used_port + 1
568       if port >= constants.LAST_DRBD_PORT:
569         raise errors.ConfigurationError("The highest used port is greater"
570                                         " than %s. Aborting." %
571                                         constants.LAST_DRBD_PORT)
572       self._config_data.cluster.highest_used_port = port
573
574     self._WriteConfig()
575     return port
576
577   def _UnlockedComputeDRBDMap(self):
578     """Compute the used DRBD minor/nodes.
579
580     @rtype: (dict, list)
581     @return: dictionary of node_name: dict of minor: instance_name;
582         the returned dict will have all the nodes in it (even if with
583         an empty list), and a list of duplicates; if the duplicates
584         list is not empty, the configuration is corrupted and its caller
585         should raise an exception
586
587     """
588     def _AppendUsedPorts(instance_name, disk, used):
589       duplicates = []
590       if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
591         node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
592         for node, port in ((node_a, minor_a), (node_b, minor_b)):
593           assert node in used, ("Node '%s' of instance '%s' not found"
594                                 " in node list" % (node, instance_name))
595           if port in used[node]:
596             duplicates.append((node, port, instance_name, used[node][port]))
597           else:
598             used[node][port] = instance_name
599       if disk.children:
600         for child in disk.children:
601           duplicates.extend(_AppendUsedPorts(instance_name, child, used))
602       return duplicates
603
604     duplicates = []
605     my_dict = dict((node, {}) for node in self._config_data.nodes)
606     for instance in self._config_data.instances.itervalues():
607       for disk in instance.disks:
608         duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
609     for (node, minor), instance in self._temporary_drbds.iteritems():
610       if minor in my_dict[node] and my_dict[node][minor] != instance:
611         duplicates.append((node, minor, instance, my_dict[node][minor]))
612       else:
613         my_dict[node][minor] = instance
614     return my_dict, duplicates
615
616   @locking.ssynchronized(_config_lock)
617   def ComputeDRBDMap(self):
618     """Compute the used DRBD minor/nodes.
619
620     This is just a wrapper over L{_UnlockedComputeDRBDMap}.
621
622     @return: dictionary of node_name: dict of minor: instance_name;
623         the returned dict will have all the nodes in it (even if with
624         an empty list).
625
626     """
627     d_map, duplicates = self._UnlockedComputeDRBDMap()
628     if duplicates:
629       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
630                                       str(duplicates))
631     return d_map
632
633   @locking.ssynchronized(_config_lock)
634   def AllocateDRBDMinor(self, nodes, instance):
635     """Allocate a drbd minor.
636
637     The free minor will be automatically computed from the existing
638     devices. A node can be given multiple times in order to allocate
639     multiple minors. The result is the list of minors, in the same
640     order as the passed nodes.
641
642     @type instance: string
643     @param instance: the instance for which we allocate minors
644
645     """
646     assert isinstance(instance, basestring), \
647            "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
648
649     d_map, duplicates = self._UnlockedComputeDRBDMap()
650     if duplicates:
651       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
652                                       str(duplicates))
653     result = []
654     for nname in nodes:
655       ndata = d_map[nname]
656       if not ndata:
657         # no minors used, we can start at 0
658         result.append(0)
659         ndata[0] = instance
660         self._temporary_drbds[(nname, 0)] = instance
661         continue
662       keys = ndata.keys()
663       keys.sort()
664       ffree = utils.FirstFree(keys)
665       if ffree is None:
666         # return the next minor
667         # TODO: implement high-limit check
668         minor = keys[-1] + 1
669       else:
670         minor = ffree
671       # double-check minor against current instances
672       assert minor not in d_map[nname], \
673              ("Attempt to reuse allocated DRBD minor %d on node %s,"
674               " already allocated to instance %s" %
675               (minor, nname, d_map[nname][minor]))
676       ndata[minor] = instance
677       # double-check minor against reservation
678       r_key = (nname, minor)
679       assert r_key not in self._temporary_drbds, \
680              ("Attempt to reuse reserved DRBD minor %d on node %s,"
681               " reserved for instance %s" %
682               (minor, nname, self._temporary_drbds[r_key]))
683       self._temporary_drbds[r_key] = instance
684       result.append(minor)
685     logging.debug("Request to allocate drbd minors, input: %s, returning %s",
686                   nodes, result)
687     return result
688
689   def _UnlockedReleaseDRBDMinors(self, instance):
690     """Release temporary drbd minors allocated for a given instance.
691
692     @type instance: string
693     @param instance: the instance for which temporary minors should be
694                      released
695
696     """
697     assert isinstance(instance, basestring), \
698            "Invalid argument passed to ReleaseDRBDMinors"
699     for key, name in self._temporary_drbds.items():
700       if name == instance:
701         del self._temporary_drbds[key]
702
703   @locking.ssynchronized(_config_lock)
704   def ReleaseDRBDMinors(self, instance):
705     """Release temporary drbd minors allocated for a given instance.
706
707     This should be called on the error paths, on the success paths
708     it's automatically called by the ConfigWriter add and update
709     functions.
710
711     This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
712
713     @type instance: string
714     @param instance: the instance for which temporary minors should be
715                      released
716
717     """
718     self._UnlockedReleaseDRBDMinors(instance)
719
720   @locking.ssynchronized(_config_lock, shared=1)
721   def GetConfigVersion(self):
722     """Get the configuration version.
723
724     @return: Config version
725
726     """
727     return self._config_data.version
728
729   @locking.ssynchronized(_config_lock, shared=1)
730   def GetClusterName(self):
731     """Get cluster name.
732
733     @return: Cluster name
734
735     """
736     return self._config_data.cluster.cluster_name
737
738   @locking.ssynchronized(_config_lock, shared=1)
739   def GetMasterNode(self):
740     """Get the hostname of the master node for this cluster.
741
742     @return: Master hostname
743
744     """
745     return self._config_data.cluster.master_node
746
747   @locking.ssynchronized(_config_lock, shared=1)
748   def GetMasterIP(self):
749     """Get the IP of the master node for this cluster.
750
751     @return: Master IP
752
753     """
754     return self._config_data.cluster.master_ip
755
756   @locking.ssynchronized(_config_lock, shared=1)
757   def GetMasterNetdev(self):
758     """Get the master network device for this cluster.
759
760     """
761     return self._config_data.cluster.master_netdev
762
763   @locking.ssynchronized(_config_lock, shared=1)
764   def GetFileStorageDir(self):
765     """Get the file storage dir for this cluster.
766
767     """
768     return self._config_data.cluster.file_storage_dir
769
770   @locking.ssynchronized(_config_lock, shared=1)
771   def GetHypervisorType(self):
772     """Get the hypervisor type for this cluster.
773
774     """
775     return self._config_data.cluster.enabled_hypervisors[0]
776
777   @locking.ssynchronized(_config_lock, shared=1)
778   def GetHostKey(self):
779     """Return the rsa hostkey from the config.
780
781     @rtype: string
782     @return: the rsa hostkey
783
784     """
785     return self._config_data.cluster.rsahostkeypub
786
787   @locking.ssynchronized(_config_lock)
788   def AddInstance(self, instance, ec_id):
789     """Add an instance to the config.
790
791     This should be used after creating a new instance.
792
793     @type instance: L{objects.Instance}
794     @param instance: the instance object
795
796     """
797     if not isinstance(instance, objects.Instance):
798       raise errors.ProgrammerError("Invalid type passed to AddInstance")
799
800     if instance.disk_template != constants.DT_DISKLESS:
801       all_lvs = instance.MapLVsByNode()
802       logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
803
804     all_macs = self._AllMACs()
805     for nic in instance.nics:
806       if nic.mac in all_macs:
807         raise errors.ConfigurationError("Cannot add instance %s:"
808                                         " MAC address '%s' already in use." %
809                                         (instance.name, nic.mac))
810
811     self._EnsureUUID(instance, ec_id)
812
813     instance.serial_no = 1
814     instance.ctime = instance.mtime = time.time()
815     self._config_data.instances[instance.name] = instance
816     self._config_data.cluster.serial_no += 1
817     self._UnlockedReleaseDRBDMinors(instance.name)
818     self._WriteConfig()
819
820   def _EnsureUUID(self, item, ec_id):
821     """Ensures a given object has a valid UUID.
822
823     @param item: the instance or node to be checked
824     @param ec_id: the execution context id for the uuid reservation
825
826     """
827     if not item.uuid:
828       item.uuid = self._GenerateUniqueID(ec_id)
829     elif item.uuid in self._AllIDs(include_temporary=True):
830       raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
831                                       " in use" % (item.name, item.uuid))
832
833   def _SetInstanceStatus(self, instance_name, status):
834     """Set the instance's status to a given value.
835
836     """
837     assert isinstance(status, bool), \
838            "Invalid status '%s' passed to SetInstanceStatus" % (status,)
839
840     if instance_name not in self._config_data.instances:
841       raise errors.ConfigurationError("Unknown instance '%s'" %
842                                       instance_name)
843     instance = self._config_data.instances[instance_name]
844     if instance.admin_up != status:
845       instance.admin_up = status
846       instance.serial_no += 1
847       instance.mtime = time.time()
848       self._WriteConfig()
849
850   @locking.ssynchronized(_config_lock)
851   def MarkInstanceUp(self, instance_name):
852     """Mark the instance status to up in the config.
853
854     """
855     self._SetInstanceStatus(instance_name, True)
856
857   @locking.ssynchronized(_config_lock)
858   def RemoveInstance(self, instance_name):
859     """Remove the instance from the configuration.
860
861     """
862     if instance_name not in self._config_data.instances:
863       raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
864     del self._config_data.instances[instance_name]
865     self._config_data.cluster.serial_no += 1
866     self._WriteConfig()
867
868   @locking.ssynchronized(_config_lock)
869   def RenameInstance(self, old_name, new_name):
870     """Rename an instance.
871
872     This needs to be done in ConfigWriter and not by RemoveInstance
873     combined with AddInstance as only we can guarantee an atomic
874     rename.
875
876     """
877     if old_name not in self._config_data.instances:
878       raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
879     inst = self._config_data.instances[old_name]
880     del self._config_data.instances[old_name]
881     inst.name = new_name
882
883     for disk in inst.disks:
884       if disk.dev_type == constants.LD_FILE:
885         # rename the file paths in logical and physical id
886         file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
887         disk.physical_id = disk.logical_id = (disk.logical_id[0],
888                                               utils.PathJoin(file_storage_dir,
889                                                              inst.name,
890                                                              disk.iv_name))
891
892     self._config_data.instances[inst.name] = inst
893     self._WriteConfig()
894
895   @locking.ssynchronized(_config_lock)
896   def MarkInstanceDown(self, instance_name):
897     """Mark the status of an instance to down in the configuration.
898
899     """
900     self._SetInstanceStatus(instance_name, False)
901
902   def _UnlockedGetInstanceList(self):
903     """Get the list of instances.
904
905     This function is for internal use, when the config lock is already held.
906
907     """
908     return self._config_data.instances.keys()
909
910   @locking.ssynchronized(_config_lock, shared=1)
911   def GetInstanceList(self):
912     """Get the list of instances.
913
914     @return: array of instances, ex. ['instance2.example.com',
915         'instance1.example.com']
916
917     """
918     return self._UnlockedGetInstanceList()
919
920   @locking.ssynchronized(_config_lock, shared=1)
921   def ExpandInstanceName(self, short_name):
922     """Attempt to expand an incomplete instance name.
923
924     """
925     return utils.MatchNameComponent(short_name,
926                                     self._config_data.instances.keys(),
927                                     case_sensitive=False)
928
929   def _UnlockedGetInstanceInfo(self, instance_name):
930     """Returns information about an instance.
931
932     This function is for internal use, when the config lock is already held.
933
934     """
935     if instance_name not in self._config_data.instances:
936       return None
937
938     return self._config_data.instances[instance_name]
939
940   @locking.ssynchronized(_config_lock, shared=1)
941   def GetInstanceInfo(self, instance_name):
942     """Returns information about an instance.
943
944     It takes the information from the configuration file. Other information of
945     an instance are taken from the live systems.
946
947     @param instance_name: name of the instance, e.g.
948         I{instance1.example.com}
949
950     @rtype: L{objects.Instance}
951     @return: the instance object
952
953     """
954     return self._UnlockedGetInstanceInfo(instance_name)
955
956   @locking.ssynchronized(_config_lock, shared=1)
957   def GetAllInstancesInfo(self):
958     """Get the configuration of all instances.
959
960     @rtype: dict
961     @return: dict of (instance, instance_info), where instance_info is what
962               would GetInstanceInfo return for the node
963
964     """
965     my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
966                     for instance in self._UnlockedGetInstanceList()])
967     return my_dict
968
969   @locking.ssynchronized(_config_lock)
970   def AddNode(self, node, ec_id):
971     """Add a node to the configuration.
972
973     @type node: L{objects.Node}
974     @param node: a Node instance
975
976     """
977     logging.info("Adding node %s to configuration", node.name)
978
979     self._EnsureUUID(node, ec_id)
980
981     node.serial_no = 1
982     node.ctime = node.mtime = time.time()
983     self._config_data.nodes[node.name] = node
984     self._config_data.cluster.serial_no += 1
985     self._WriteConfig()
986
987   @locking.ssynchronized(_config_lock)
988   def RemoveNode(self, node_name):
989     """Remove a node from the configuration.
990
991     """
992     logging.info("Removing node %s from configuration", node_name)
993
994     if node_name not in self._config_data.nodes:
995       raise errors.ConfigurationError("Unknown node '%s'" % node_name)
996
997     del self._config_data.nodes[node_name]
998     self._config_data.cluster.serial_no += 1
999     self._WriteConfig()
1000
1001   @locking.ssynchronized(_config_lock, shared=1)
1002   def ExpandNodeName(self, short_name):
1003     """Attempt to expand an incomplete instance name.
1004
1005     """
1006     return utils.MatchNameComponent(short_name,
1007                                     self._config_data.nodes.keys(),
1008                                     case_sensitive=False)
1009
1010   def _UnlockedGetNodeInfo(self, node_name):
1011     """Get the configuration of a node, as stored in the config.
1012
1013     This function is for internal use, when the config lock is already
1014     held.
1015
1016     @param node_name: the node name, e.g. I{node1.example.com}
1017
1018     @rtype: L{objects.Node}
1019     @return: the node object
1020
1021     """
1022     if node_name not in self._config_data.nodes:
1023       return None
1024
1025     return self._config_data.nodes[node_name]
1026
1027   @locking.ssynchronized(_config_lock, shared=1)
1028   def GetNodeInfo(self, node_name):
1029     """Get the configuration of a node, as stored in the config.
1030
1031     This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1032
1033     @param node_name: the node name, e.g. I{node1.example.com}
1034
1035     @rtype: L{objects.Node}
1036     @return: the node object
1037
1038     """
1039     return self._UnlockedGetNodeInfo(node_name)
1040
1041   def _UnlockedGetNodeList(self):
1042     """Return the list of nodes which are in the configuration.
1043
1044     This function is for internal use, when the config lock is already
1045     held.
1046
1047     @rtype: list
1048
1049     """
1050     return self._config_data.nodes.keys()
1051
1052   @locking.ssynchronized(_config_lock, shared=1)
1053   def GetNodeList(self):
1054     """Return the list of nodes which are in the configuration.
1055
1056     """
1057     return self._UnlockedGetNodeList()
1058
1059   @locking.ssynchronized(_config_lock, shared=1)
1060   def GetOnlineNodeList(self):
1061     """Return the list of nodes which are online.
1062
1063     """
1064     all_nodes = [self._UnlockedGetNodeInfo(node)
1065                  for node in self._UnlockedGetNodeList()]
1066     return [node.name for node in all_nodes if not node.offline]
1067
1068   @locking.ssynchronized(_config_lock, shared=1)
1069   def GetAllNodesInfo(self):
1070     """Get the configuration of all nodes.
1071
1072     @rtype: dict
1073     @return: dict of (node, node_info), where node_info is what
1074               would GetNodeInfo return for the node
1075
1076     """
1077     my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
1078                     for node in self._UnlockedGetNodeList()])
1079     return my_dict
1080
1081   def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1082     """Get the number of current and maximum desired and possible candidates.
1083
1084     @type exceptions: list
1085     @param exceptions: if passed, list of nodes that should be ignored
1086     @rtype: tuple
1087     @return: tuple of (current, desired and possible, possible)
1088
1089     """
1090     mc_now = mc_should = mc_max = 0
1091     for node in self._config_data.nodes.values():
1092       if exceptions and node.name in exceptions:
1093         continue
1094       if not (node.offline or node.drained):
1095         mc_max += 1
1096       if node.master_candidate:
1097         mc_now += 1
1098     mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1099     return (mc_now, mc_should, mc_max)
1100
1101   @locking.ssynchronized(_config_lock, shared=1)
1102   def GetMasterCandidateStats(self, exceptions=None):
1103     """Get the number of current and maximum possible candidates.
1104
1105     This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1106
1107     @type exceptions: list
1108     @param exceptions: if passed, list of nodes that should be ignored
1109     @rtype: tuple
1110     @return: tuple of (current, max)
1111
1112     """
1113     return self._UnlockedGetMasterCandidateStats(exceptions)
1114
1115   @locking.ssynchronized(_config_lock)
1116   def MaintainCandidatePool(self, exceptions):
1117     """Try to grow the candidate pool to the desired size.
1118
1119     @type exceptions: list
1120     @param exceptions: if passed, list of nodes that should be ignored
1121     @rtype: list
1122     @return: list with the adjusted nodes (L{objects.Node} instances)
1123
1124     """
1125     mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1126     mod_list = []
1127     if mc_now < mc_max:
1128       node_list = self._config_data.nodes.keys()
1129       random.shuffle(node_list)
1130       for name in node_list:
1131         if mc_now >= mc_max:
1132           break
1133         node = self._config_data.nodes[name]
1134         if (node.master_candidate or node.offline or node.drained or
1135             node.name in exceptions):
1136           continue
1137         mod_list.append(node)
1138         node.master_candidate = True
1139         node.serial_no += 1
1140         mc_now += 1
1141       if mc_now != mc_max:
1142         # this should not happen
1143         logging.warning("Warning: MaintainCandidatePool didn't manage to"
1144                         " fill the candidate pool (%d/%d)", mc_now, mc_max)
1145       if mod_list:
1146         self._config_data.cluster.serial_no += 1
1147         self._WriteConfig()
1148
1149     return mod_list
1150
1151   def _BumpSerialNo(self):
1152     """Bump up the serial number of the config.
1153
1154     """
1155     self._config_data.serial_no += 1
1156     self._config_data.mtime = time.time()
1157
1158   def _AllUUIDObjects(self):
1159     """Returns all objects with uuid attributes.
1160
1161     """
1162     return (self._config_data.instances.values() +
1163             self._config_data.nodes.values() +
1164             [self._config_data.cluster])
1165
1166   def _OpenConfig(self):
1167     """Read the config data from disk.
1168
1169     """
1170     raw_data = utils.ReadFile(self._cfg_file)
1171
1172     try:
1173       data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1174     except Exception, err:
1175       raise errors.ConfigurationError(err)
1176
1177     # Make sure the configuration has the right version
1178     _ValidateConfig(data)
1179
1180     if (not hasattr(data, 'cluster') or
1181         not hasattr(data.cluster, 'rsahostkeypub')):
1182       raise errors.ConfigurationError("Incomplete configuration"
1183                                       " (missing cluster.rsahostkeypub)")
1184
1185     # Upgrade configuration if needed
1186     data.UpgradeConfig()
1187
1188     self._config_data = data
1189     # reset the last serial as -1 so that the next write will cause
1190     # ssconf update
1191     self._last_cluster_serial = -1
1192
1193     # And finally run our (custom) config upgrade sequence
1194     self._UpgradeConfig()
1195
1196   def _UpgradeConfig(self):
1197     """Run upgrade steps that cannot be done purely in the objects.
1198
1199     This is because some data elements need uniqueness across the
1200     whole configuration, etc.
1201
1202     @warning: this function will call L{_WriteConfig()}, so it needs
1203         to either be called with the lock held or from a safe place
1204         (the constructor)
1205
1206     """
1207     modified = False
1208     for item in self._AllUUIDObjects():
1209       if item.uuid is None:
1210         item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1211         modified = True
1212     if modified:
1213       self._WriteConfig()
1214       # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1215       # only called at config init time, without the lock held
1216       self.DropECReservations(_UPGRADE_CONFIG_JID)
1217
1218   def _DistributeConfig(self, feedback_fn):
1219     """Distribute the configuration to the other nodes.
1220
1221     Currently, this only copies the configuration file. In the future,
1222     it could be used to encapsulate the 2/3-phase update mechanism.
1223
1224     """
1225     if self._offline:
1226       return True
1227
1228     bad = False
1229
1230     node_list = []
1231     addr_list = []
1232     myhostname = self._my_hostname
1233     # we can skip checking whether _UnlockedGetNodeInfo returns None
1234     # since the node list comes from _UnlocketGetNodeList, and we are
1235     # called with the lock held, so no modifications should take place
1236     # in between
1237     for node_name in self._UnlockedGetNodeList():
1238       if node_name == myhostname:
1239         continue
1240       node_info = self._UnlockedGetNodeInfo(node_name)
1241       if not node_info.master_candidate:
1242         continue
1243       node_list.append(node_info.name)
1244       addr_list.append(node_info.primary_ip)
1245
1246     result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1247                                             address_list=addr_list)
1248     for to_node, to_result in result.items():
1249       msg = to_result.fail_msg
1250       if msg:
1251         msg = ("Copy of file %s to node %s failed: %s" %
1252                (self._cfg_file, to_node, msg))
1253         logging.error(msg)
1254
1255         if feedback_fn:
1256           feedback_fn(msg)
1257
1258         bad = True
1259
1260     return not bad
1261
1262   def _WriteConfig(self, destination=None, feedback_fn=None):
1263     """Write the configuration data to persistent storage.
1264
1265     """
1266     assert feedback_fn is None or callable(feedback_fn)
1267
1268     # Warn on config errors, but don't abort the save - the
1269     # configuration has already been modified, and we can't revert;
1270     # the best we can do is to warn the user and save as is, leaving
1271     # recovery to the user
1272     config_errors = self._UnlockedVerifyConfig()
1273     if config_errors:
1274       errmsg = ("Configuration data is not consistent: %s" %
1275                 (utils.CommaJoin(config_errors)))
1276       logging.critical(errmsg)
1277       if feedback_fn:
1278         feedback_fn(errmsg)
1279
1280     if destination is None:
1281       destination = self._cfg_file
1282     self._BumpSerialNo()
1283     txt = serializer.Dump(self._config_data.ToDict())
1284
1285     utils.WriteFile(destination, data=txt)
1286
1287     self.write_count += 1
1288
1289     # and redistribute the config file to master candidates
1290     self._DistributeConfig(feedback_fn)
1291
1292     # Write ssconf files on all nodes (including locally)
1293     if self._last_cluster_serial < self._config_data.cluster.serial_no:
1294       if not self._offline:
1295         result = rpc.RpcRunner.call_write_ssconf_files(
1296           self._UnlockedGetNodeList(),
1297           self._UnlockedGetSsconfValues())
1298
1299         for nname, nresu in result.items():
1300           msg = nresu.fail_msg
1301           if msg:
1302             errmsg = ("Error while uploading ssconf files to"
1303                       " node %s: %s" % (nname, msg))
1304             logging.warning(errmsg)
1305
1306             if feedback_fn:
1307               feedback_fn(errmsg)
1308
1309       self._last_cluster_serial = self._config_data.cluster.serial_no
1310
1311   def _UnlockedGetSsconfValues(self):
1312     """Return the values needed by ssconf.
1313
1314     @rtype: dict
1315     @return: a dictionary with keys the ssconf names and values their
1316         associated value
1317
1318     """
1319     fn = "\n".join
1320     instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1321     node_names = utils.NiceSort(self._UnlockedGetNodeList())
1322     node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1323     node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1324                     for ninfo in node_info]
1325     node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1326                     for ninfo in node_info]
1327
1328     instance_data = fn(instance_names)
1329     off_data = fn(node.name for node in node_info if node.offline)
1330     on_data = fn(node.name for node in node_info if not node.offline)
1331     mc_data = fn(node.name for node in node_info if node.master_candidate)
1332     mc_ips_data = fn(node.primary_ip for node in node_info
1333                      if node.master_candidate)
1334     node_data = fn(node_names)
1335     node_pri_ips_data = fn(node_pri_ips)
1336     node_snd_ips_data = fn(node_snd_ips)
1337
1338     cluster = self._config_data.cluster
1339     cluster_tags = fn(cluster.GetTags())
1340     return {
1341       constants.SS_CLUSTER_NAME: cluster.cluster_name,
1342       constants.SS_CLUSTER_TAGS: cluster_tags,
1343       constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1344       constants.SS_MASTER_CANDIDATES: mc_data,
1345       constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1346       constants.SS_MASTER_IP: cluster.master_ip,
1347       constants.SS_MASTER_NETDEV: cluster.master_netdev,
1348       constants.SS_MASTER_NODE: cluster.master_node,
1349       constants.SS_NODE_LIST: node_data,
1350       constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1351       constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1352       constants.SS_OFFLINE_NODES: off_data,
1353       constants.SS_ONLINE_NODES: on_data,
1354       constants.SS_INSTANCE_LIST: instance_data,
1355       constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1356       }
1357
1358   @locking.ssynchronized(_config_lock, shared=1)
1359   def GetVGName(self):
1360     """Return the volume group name.
1361
1362     """
1363     return self._config_data.cluster.volume_group_name
1364
1365   @locking.ssynchronized(_config_lock)
1366   def SetVGName(self, vg_name):
1367     """Set the volume group name.
1368
1369     """
1370     self._config_data.cluster.volume_group_name = vg_name
1371     self._config_data.cluster.serial_no += 1
1372     self._WriteConfig()
1373
1374   @locking.ssynchronized(_config_lock, shared=1)
1375   def GetMACPrefix(self):
1376     """Return the mac prefix.
1377
1378     """
1379     return self._config_data.cluster.mac_prefix
1380
1381   @locking.ssynchronized(_config_lock, shared=1)
1382   def GetClusterInfo(self):
1383     """Returns information about the cluster
1384
1385     @rtype: L{objects.Cluster}
1386     @return: the cluster object
1387
1388     """
1389     return self._config_data.cluster
1390
1391   @locking.ssynchronized(_config_lock)
1392   def Update(self, target, feedback_fn):
1393     """Notify function to be called after updates.
1394
1395     This function must be called when an object (as returned by
1396     GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1397     caller wants the modifications saved to the backing store. Note
1398     that all modified objects will be saved, but the target argument
1399     is the one the caller wants to ensure that it's saved.
1400
1401     @param target: an instance of either L{objects.Cluster},
1402         L{objects.Node} or L{objects.Instance} which is existing in
1403         the cluster
1404     @param feedback_fn: Callable feedback function
1405
1406     """
1407     if self._config_data is None:
1408       raise errors.ProgrammerError("Configuration file not read,"
1409                                    " cannot save.")
1410     update_serial = False
1411     if isinstance(target, objects.Cluster):
1412       test = target == self._config_data.cluster
1413     elif isinstance(target, objects.Node):
1414       test = target in self._config_data.nodes.values()
1415       update_serial = True
1416     elif isinstance(target, objects.Instance):
1417       test = target in self._config_data.instances.values()
1418     else:
1419       raise errors.ProgrammerError("Invalid object type (%s) passed to"
1420                                    " ConfigWriter.Update" % type(target))
1421     if not test:
1422       raise errors.ConfigurationError("Configuration updated since object"
1423                                       " has been read or unknown object")
1424     target.serial_no += 1
1425     target.mtime = now = time.time()
1426
1427     if update_serial:
1428       # for node updates, we need to increase the cluster serial too
1429       self._config_data.cluster.serial_no += 1
1430       self._config_data.cluster.mtime = now
1431
1432     if isinstance(target, objects.Instance):
1433       self._UnlockedReleaseDRBDMinors(target.name)
1434
1435     self._WriteConfig(feedback_fn=feedback_fn)
1436
1437   @locking.ssynchronized(_config_lock)
1438   def DropECReservations(self, ec_id):
1439     """Drop per-execution-context reservations
1440
1441     """
1442     self._temporary_ids.DropECReservations(ec_id)
1443     self._temporary_macs.DropECReservations(ec_id)
1444     self._temporary_secrets.DropECReservations(ec_id)