Statistics
| Branch: | Tag: | Revision:

root / lib / config.py @ 1e0d3321

History | View | Annotate | Download (78.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Configuration management for Ganeti
23

24
This module provides the interface to the Ganeti cluster configuration.
25

26
The configuration data is stored on every node but is updated on the master
27
only. After each update, the master distributes the data to the other nodes.
28

29
Currently, the data storage format is JSON. YAML was slow and consuming too
30
much memory.
31

32
"""
33

    
34
# pylint: disable=R0904
35
# R0904: Too many public methods
36

    
37
import os
38
import random
39
import logging
40
import time
41
import itertools
42

    
43
from ganeti import errors
44
from ganeti import locking
45
from ganeti import utils
46
from ganeti import constants
47
from ganeti import rpc
48
from ganeti import objects
49
from ganeti import serializer
50
from ganeti import uidpool
51
from ganeti import netutils
52
from ganeti import runtime
53
from ganeti import pathutils
54
from ganeti import network
55

    
56

    
57
_config_lock = locking.SharedLock("ConfigWriter")
58

    
59
# job id used for resource management at config upgrade time
60
_UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
61

    
62

    
63
def _ValidateConfig(data):
64
  """Verifies that a configuration objects looks valid.
65

66
  This only verifies the version of the configuration.
67

68
  @raise errors.ConfigurationError: if the version differs from what
69
      we expect
70

71
  """
72
  if data.version != constants.CONFIG_VERSION:
73
    raise errors.ConfigVersionMismatch(constants.CONFIG_VERSION, data.version)
74

    
75

    
76
class TemporaryReservationManager:
77
  """A temporary resource reservation manager.
78

79
  This is used to reserve resources in a job, before using them, making sure
80
  other jobs cannot get them in the meantime.
81

82
  """
83
  def __init__(self):
84
    self._ec_reserved = {}
85

    
86
  def Reserved(self, resource):
87
    for holder_reserved in self._ec_reserved.values():
88
      if resource in holder_reserved:
89
        return True
90
    return False
91

    
92
  def Reserve(self, ec_id, resource):
93
    if self.Reserved(resource):
94
      raise errors.ReservationError("Duplicate reservation for resource '%s'"
95
                                    % str(resource))
96
    if ec_id not in self._ec_reserved:
97
      self._ec_reserved[ec_id] = set([resource])
98
    else:
99
      self._ec_reserved[ec_id].add(resource)
100

    
101
  def DropECReservations(self, ec_id):
102
    if ec_id in self._ec_reserved:
103
      del self._ec_reserved[ec_id]
104

    
105
  def GetReserved(self):
106
    all_reserved = set()
107
    for holder_reserved in self._ec_reserved.values():
108
      all_reserved.update(holder_reserved)
109
    return all_reserved
110

    
111
  def Generate(self, existing, generate_one_fn, ec_id):
112
    """Generate a new resource of this type
113

114
    """
115
    assert callable(generate_one_fn)
116

    
117
    all_elems = self.GetReserved()
118
    all_elems.update(existing)
119
    retries = 64
120
    while retries > 0:
121
      new_resource = generate_one_fn()
122
      if new_resource is not None and new_resource not in all_elems:
123
        break
124
    else:
125
      raise errors.ConfigurationError("Not able generate new resource"
126
                                      " (last tried: %s)" % new_resource)
127
    self.Reserve(ec_id, new_resource)
128
    return new_resource
129

    
130

    
131
def _MatchNameComponentIgnoreCase(short_name, names):
132
  """Wrapper around L{utils.text.MatchNameComponent}.
133

134
  """
135
  return utils.MatchNameComponent(short_name, names, case_sensitive=False)
136

    
137

    
138
def _CheckInstanceDiskIvNames(disks):
139
  """Checks if instance's disks' C{iv_name} attributes are in order.
140

141
  @type disks: list of L{objects.Disk}
142
  @param disks: List of disks
143
  @rtype: list of tuples; (int, string, string)
144
  @return: List of wrongly named disks, each tuple contains disk index,
145
    expected and actual name
146

147
  """
148
  result = []
149

    
150
  for (idx, disk) in enumerate(disks):
151
    exp_iv_name = "disk/%s" % idx
152
    if disk.iv_name != exp_iv_name:
153
      result.append((idx, exp_iv_name, disk.iv_name))
154

    
155
  return result
156

    
157

    
158
class ConfigWriter:
159
  """The interface to the cluster configuration.
160

161
  @ivar _temporary_lvs: reservation manager for temporary LVs
162
  @ivar _all_rms: a list of all temporary reservation managers
163

164
  """
165
  def __init__(self, cfg_file=None, offline=False, _getents=runtime.GetEnts,
166
               accept_foreign=False):
167
    self.write_count = 0
168
    self._lock = _config_lock
169
    self._config_data = None
170
    self._offline = offline
171
    if cfg_file is None:
172
      self._cfg_file = pathutils.CLUSTER_CONF_FILE
173
    else:
174
      self._cfg_file = cfg_file
175
    self._getents = _getents
176
    self._temporary_ids = TemporaryReservationManager()
177
    self._temporary_drbds = {}
178
    self._temporary_macs = TemporaryReservationManager()
179
    self._temporary_secrets = TemporaryReservationManager()
180
    self._temporary_lvs = TemporaryReservationManager()
181
    self._all_rms = [self._temporary_ids, self._temporary_macs,
182
                     self._temporary_secrets, self._temporary_lvs]
183
    # Note: in order to prevent errors when resolving our name in
184
    # _DistributeConfig, we compute it here once and reuse it; it's
185
    # better to raise an error before starting to modify the config
186
    # file than after it was modified
187
    self._my_hostname = netutils.Hostname.GetSysName()
188
    self._last_cluster_serial = -1
189
    self._cfg_id = None
190
    self._context = None
191
    self._OpenConfig(accept_foreign)
192

    
193
  def _GetRpc(self, address_list):
194
    """Returns RPC runner for configuration.
195

196
    """
197
    return rpc.ConfigRunner(self._context, address_list)
198

    
199
  def SetContext(self, context):
200
    """Sets Ganeti context.
201

202
    """
203
    self._context = context
204

    
205
  # this method needs to be static, so that we can call it on the class
206
  @staticmethod
207
  def IsCluster():
208
    """Check if the cluster is configured.
209

210
    """
211
    return os.path.exists(pathutils.CLUSTER_CONF_FILE)
212

    
213
  def _GenerateOneMAC(self):
214
    """Generate one mac address
215

216
    """
217
    prefix = self._config_data.cluster.mac_prefix
218
    byte1 = random.randrange(0, 256)
219
    byte2 = random.randrange(0, 256)
220
    byte3 = random.randrange(0, 256)
221
    mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
222
    return mac
223

    
224
  @locking.ssynchronized(_config_lock, shared=1)
225
  def GetNdParams(self, node):
226
    """Get the node params populated with cluster defaults.
227

228
    @type node: L{objects.Node}
229
    @param node: The node we want to know the params for
230
    @return: A dict with the filled in node params
231

232
    """
233
    nodegroup = self._UnlockedGetNodeGroup(node.group)
234
    return self._config_data.cluster.FillND(node, nodegroup)
235

    
236
  @locking.ssynchronized(_config_lock, shared=1)
237
  def GetInstanceDiskParams(self, instance):
238
    """Get the disk params populated with inherit chain.
239

240
    @type instance: L{objects.Instance}
241
    @param instance: The instance we want to know the params for
242
    @return: A dict with the filled in disk params
243

244
    """
245
    node = self._UnlockedGetNodeInfo(instance.primary_node)
246
    nodegroup = self._UnlockedGetNodeGroup(node.group)
247
    return self._UnlockedGetGroupDiskParams(nodegroup)
248

    
249
  @locking.ssynchronized(_config_lock, shared=1)
250
  def GetGroupDiskParams(self, group):
251
    """Get the disk params populated with inherit chain.
252

253
    @type group: L{objects.NodeGroup}
254
    @param group: The group we want to know the params for
255
    @return: A dict with the filled in disk params
256

257
    """
258
    return self._UnlockedGetGroupDiskParams(group)
259

    
260
  def _UnlockedGetGroupDiskParams(self, group):
261
    """Get the disk params populated with inherit chain down to node-group.
262

263
    @type group: L{objects.NodeGroup}
264
    @param group: The group we want to know the params for
265
    @return: A dict with the filled in disk params
266

267
    """
268
    return self._config_data.cluster.SimpleFillDP(group.diskparams)
269

    
270
  @locking.ssynchronized(_config_lock, shared=1)
271
  def GenerateMAC(self, ec_id):
272
    """Generate a MAC for an instance.
273

274
    This should check the current instances for duplicates.
275

276
    """
277
    existing = self._AllMACs()
278
    return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
279

    
280
  @locking.ssynchronized(_config_lock, shared=1)
281
  def ReserveMAC(self, mac, ec_id):
282
    """Reserve a MAC for an instance.
283

284
    This only checks instances managed by this cluster, it does not
285
    check for potential collisions elsewhere.
286

287
    """
288
    all_macs = self._AllMACs()
289
    if mac in all_macs:
290
      raise errors.ReservationError("mac already in use")
291
    else:
292
      self._temporary_macs.Reserve(ec_id, mac)
293

    
294
  @locking.ssynchronized(_config_lock, shared=1)
295
  def ReserveLV(self, lv_name, ec_id):
296
    """Reserve an VG/LV pair for an instance.
297

298
    @type lv_name: string
299
    @param lv_name: the logical volume name to reserve
300

301
    """
302
    all_lvs = self._AllLVs()
303
    if lv_name in all_lvs:
304
      raise errors.ReservationError("LV already in use")
305
    else:
306
      self._temporary_lvs.Reserve(ec_id, lv_name)
307

    
308
  @locking.ssynchronized(_config_lock, shared=1)
309
  def GenerateDRBDSecret(self, ec_id):
310
    """Generate a DRBD secret.
311

312
    This checks the current disks for duplicates.
313

314
    """
315
    return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
316
                                            utils.GenerateSecret,
317
                                            ec_id)
318

    
319
  def _AllLVs(self):
320
    """Compute the list of all LVs.
321

322
    """
323
    lvnames = set()
324
    for instance in self._config_data.instances.values():
325
      node_data = instance.MapLVsByNode()
326
      for lv_list in node_data.values():
327
        lvnames.update(lv_list)
328
    return lvnames
329

    
330
  def _AllIDs(self, include_temporary):
331
    """Compute the list of all UUIDs and names we have.
332

333
    @type include_temporary: boolean
334
    @param include_temporary: whether to include the _temporary_ids set
335
    @rtype: set
336
    @return: a set of IDs
337

338
    """
339
    existing = set()
340
    if include_temporary:
341
      existing.update(self._temporary_ids.GetReserved())
342
    existing.update(self._AllLVs())
343
    existing.update(self._config_data.instances.keys())
344
    existing.update(self._config_data.nodes.keys())
345
    existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
346
    return existing
347

    
348
  def _GenerateUniqueID(self, ec_id):
349
    """Generate an unique UUID.
350

351
    This checks the current node, instances and disk names for
352
    duplicates.
353

354
    @rtype: string
355
    @return: the unique id
356

357
    """
358
    existing = self._AllIDs(include_temporary=False)
359
    return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
360

    
361
  @locking.ssynchronized(_config_lock, shared=1)
362
  def GenerateUniqueID(self, ec_id):
363
    """Generate an unique ID.
364

365
    This is just a wrapper over the unlocked version.
366

367
    @type ec_id: string
368
    @param ec_id: unique id for the job to reserve the id to
369

370
    """
371
    return self._GenerateUniqueID(ec_id)
372

    
373
  def _AllMACs(self):
374
    """Return all MACs present in the config.
375

376
    @rtype: list
377
    @return: the list of all MACs
378

379
    """
380
    result = []
381
    for instance in self._config_data.instances.values():
382
      for nic in instance.nics:
383
        result.append(nic.mac)
384

    
385
    return result
386

    
387
  def _AllDRBDSecrets(self):
388
    """Return all DRBD secrets present in the config.
389

390
    @rtype: list
391
    @return: the list of all DRBD secrets
392

393
    """
394
    def helper(disk, result):
395
      """Recursively gather secrets from this disk."""
396
      if disk.dev_type == constants.DT_DRBD8:
397
        result.append(disk.logical_id[5])
398
      if disk.children:
399
        for child in disk.children:
400
          helper(child, result)
401

    
402
    result = []
403
    for instance in self._config_data.instances.values():
404
      for disk in instance.disks:
405
        helper(disk, result)
406

    
407
    return result
408

    
409
  def _CheckDiskIDs(self, disk, l_ids, p_ids):
410
    """Compute duplicate disk IDs
411

412
    @type disk: L{objects.Disk}
413
    @param disk: the disk at which to start searching
414
    @type l_ids: list
415
    @param l_ids: list of current logical ids
416
    @type p_ids: list
417
    @param p_ids: list of current physical ids
418
    @rtype: list
419
    @return: a list of error messages
420

421
    """
422
    result = []
423
    if disk.logical_id is not None:
424
      if disk.logical_id in l_ids:
425
        result.append("duplicate logical id %s" % str(disk.logical_id))
426
      else:
427
        l_ids.append(disk.logical_id)
428
    if disk.physical_id is not None:
429
      if disk.physical_id in p_ids:
430
        result.append("duplicate physical id %s" % str(disk.physical_id))
431
      else:
432
        p_ids.append(disk.physical_id)
433

    
434
    if disk.children:
435
      for child in disk.children:
436
        result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
437
    return result
438

    
439
  def _UnlockedVerifyConfig(self):
440
    """Verify function.
441

442
    @rtype: list
443
    @return: a list of error messages; a non-empty list signifies
444
        configuration errors
445

446
    """
447
    # pylint: disable=R0914
448
    result = []
449
    seen_macs = []
450
    ports = {}
451
    data = self._config_data
452
    cluster = data.cluster
453
    seen_lids = []
454
    seen_pids = []
455

    
456
    # global cluster checks
457
    if not cluster.enabled_hypervisors:
458
      result.append("enabled hypervisors list doesn't have any entries")
459
    invalid_hvs = set(cluster.enabled_hypervisors) - constants.HYPER_TYPES
460
    if invalid_hvs:
461
      result.append("enabled hypervisors contains invalid entries: %s" %
462
                    invalid_hvs)
463
    missing_hvp = (set(cluster.enabled_hypervisors) -
464
                   set(cluster.hvparams.keys()))
465
    if missing_hvp:
466
      result.append("hypervisor parameters missing for the enabled"
467
                    " hypervisor(s) %s" % utils.CommaJoin(missing_hvp))
468

    
469
    if cluster.master_node not in data.nodes:
470
      result.append("cluster has invalid primary node '%s'" %
471
                    cluster.master_node)
472

    
473
    def _helper(owner, attr, value, template):
474
      try:
475
        utils.ForceDictType(value, template)
476
      except errors.GenericError, err:
477
        result.append("%s has invalid %s: %s" % (owner, attr, err))
478

    
479
    def _helper_nic(owner, params):
480
      try:
481
        objects.NIC.CheckParameterSyntax(params)
482
      except errors.ConfigurationError, err:
483
        result.append("%s has invalid nicparams: %s" % (owner, err))
484

    
485
    def _helper_ipolicy(owner, params, check_std):
486
      try:
487
        objects.InstancePolicy.CheckParameterSyntax(params, check_std)
488
      except errors.ConfigurationError, err:
489
        result.append("%s has invalid instance policy: %s" % (owner, err))
490

    
491
    def _helper_ispecs(owner, params):
492
      for key, value in params.items():
493
        if key in constants.IPOLICY_ISPECS:
494
          fullkey = "ipolicy/" + key
495
          _helper(owner, fullkey, value, constants.ISPECS_PARAMETER_TYPES)
496
        else:
497
          # FIXME: assuming list type
498
          if key in constants.IPOLICY_PARAMETERS:
499
            exp_type = float
500
          else:
501
            exp_type = list
502
          if not isinstance(value, exp_type):
503
            result.append("%s has invalid instance policy: for %s,"
504
                          " expecting %s, got %s" %
505
                          (owner, key, exp_type.__name__, type(value)))
506

    
507
    # check cluster parameters
508
    _helper("cluster", "beparams", cluster.SimpleFillBE({}),
509
            constants.BES_PARAMETER_TYPES)
510
    _helper("cluster", "nicparams", cluster.SimpleFillNIC({}),
511
            constants.NICS_PARAMETER_TYPES)
512
    _helper_nic("cluster", cluster.SimpleFillNIC({}))
513
    _helper("cluster", "ndparams", cluster.SimpleFillND({}),
514
            constants.NDS_PARAMETER_TYPES)
515
    _helper_ipolicy("cluster", cluster.SimpleFillIPolicy({}), True)
516
    _helper_ispecs("cluster", cluster.SimpleFillIPolicy({}))
517

    
518
    # per-instance checks
519
    for instance_name in data.instances:
520
      instance = data.instances[instance_name]
521
      if instance.name != instance_name:
522
        result.append("instance '%s' is indexed by wrong name '%s'" %
523
                      (instance.name, instance_name))
524
      if instance.primary_node not in data.nodes:
525
        result.append("instance '%s' has invalid primary node '%s'" %
526
                      (instance_name, instance.primary_node))
527
      for snode in instance.secondary_nodes:
528
        if snode not in data.nodes:
529
          result.append("instance '%s' has invalid secondary node '%s'" %
530
                        (instance_name, snode))
531
      for idx, nic in enumerate(instance.nics):
532
        if nic.mac in seen_macs:
533
          result.append("instance '%s' has NIC %d mac %s duplicate" %
534
                        (instance_name, idx, nic.mac))
535
        else:
536
          seen_macs.append(nic.mac)
537
        if nic.nicparams:
538
          filled = cluster.SimpleFillNIC(nic.nicparams)
539
          owner = "instance %s nic %d" % (instance.name, idx)
540
          _helper(owner, "nicparams",
541
                  filled, constants.NICS_PARAMETER_TYPES)
542
          _helper_nic(owner, filled)
543

    
544
      # parameter checks
545
      if instance.beparams:
546
        _helper("instance %s" % instance.name, "beparams",
547
                cluster.FillBE(instance), constants.BES_PARAMETER_TYPES)
548

    
549
      # gather the drbd ports for duplicate checks
550
      for (idx, dsk) in enumerate(instance.disks):
551
        if dsk.dev_type in constants.LDS_DRBD:
552
          tcp_port = dsk.logical_id[2]
553
          if tcp_port not in ports:
554
            ports[tcp_port] = []
555
          ports[tcp_port].append((instance.name, "drbd disk %s" % idx))
556
      # gather network port reservation
557
      net_port = getattr(instance, "network_port", None)
558
      if net_port is not None:
559
        if net_port not in ports:
560
          ports[net_port] = []
561
        ports[net_port].append((instance.name, "network port"))
562

    
563
      # instance disk verify
564
      for idx, disk in enumerate(instance.disks):
565
        result.extend(["instance '%s' disk %d error: %s" %
566
                       (instance.name, idx, msg) for msg in disk.Verify()])
567
        result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
568

    
569
      wrong_names = _CheckInstanceDiskIvNames(instance.disks)
570
      if wrong_names:
571
        tmp = "; ".join(("name of disk %s should be '%s', but is '%s'" %
572
                         (idx, exp_name, actual_name))
573
                        for (idx, exp_name, actual_name) in wrong_names)
574

    
575
        result.append("Instance '%s' has wrongly named disks: %s" %
576
                      (instance.name, tmp))
577

    
578
    # cluster-wide pool of free ports
579
    for free_port in cluster.tcpudp_port_pool:
580
      if free_port not in ports:
581
        ports[free_port] = []
582
      ports[free_port].append(("cluster", "port marked as free"))
583

    
584
    # compute tcp/udp duplicate ports
585
    keys = ports.keys()
586
    keys.sort()
587
    for pnum in keys:
588
      pdata = ports[pnum]
589
      if len(pdata) > 1:
590
        txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
591
        result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
592

    
593
    # highest used tcp port check
594
    if keys:
595
      if keys[-1] > cluster.highest_used_port:
596
        result.append("Highest used port mismatch, saved %s, computed %s" %
597
                      (cluster.highest_used_port, keys[-1]))
598

    
599
    if not data.nodes[cluster.master_node].master_candidate:
600
      result.append("Master node is not a master candidate")
601

    
602
    # master candidate checks
603
    mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
604
    if mc_now < mc_max:
605
      result.append("Not enough master candidates: actual %d, target %d" %
606
                    (mc_now, mc_max))
607

    
608
    # node checks
609
    for node_name, node in data.nodes.items():
610
      if node.name != node_name:
611
        result.append("Node '%s' is indexed by wrong name '%s'" %
612
                      (node.name, node_name))
613
      if [node.master_candidate, node.drained, node.offline].count(True) > 1:
614
        result.append("Node %s state is invalid: master_candidate=%s,"
615
                      " drain=%s, offline=%s" %
616
                      (node.name, node.master_candidate, node.drained,
617
                       node.offline))
618
      if node.group not in data.nodegroups:
619
        result.append("Node '%s' has invalid group '%s'" %
620
                      (node.name, node.group))
621
      else:
622
        _helper("node %s" % node.name, "ndparams",
623
                cluster.FillND(node, data.nodegroups[node.group]),
624
                constants.NDS_PARAMETER_TYPES)
625

    
626
    # nodegroups checks
627
    nodegroups_names = set()
628
    for nodegroup_uuid in data.nodegroups:
629
      nodegroup = data.nodegroups[nodegroup_uuid]
630
      if nodegroup.uuid != nodegroup_uuid:
631
        result.append("node group '%s' (uuid: '%s') indexed by wrong uuid '%s'"
632
                      % (nodegroup.name, nodegroup.uuid, nodegroup_uuid))
633
      if utils.UUID_RE.match(nodegroup.name.lower()):
634
        result.append("node group '%s' (uuid: '%s') has uuid-like name" %
635
                      (nodegroup.name, nodegroup.uuid))
636
      if nodegroup.name in nodegroups_names:
637
        result.append("duplicate node group name '%s'" % nodegroup.name)
638
      else:
639
        nodegroups_names.add(nodegroup.name)
640
      group_name = "group %s" % nodegroup.name
641
      _helper_ipolicy(group_name, cluster.SimpleFillIPolicy(nodegroup.ipolicy),
642
                      False)
643
      _helper_ispecs(group_name, cluster.SimpleFillIPolicy(nodegroup.ipolicy))
644
      if nodegroup.ndparams:
645
        _helper(group_name, "ndparams",
646
                cluster.SimpleFillND(nodegroup.ndparams),
647
                constants.NDS_PARAMETER_TYPES)
648

    
649
    # drbd minors check
650
    _, duplicates = self._UnlockedComputeDRBDMap()
651
    for node, minor, instance_a, instance_b in duplicates:
652
      result.append("DRBD minor %d on node %s is assigned twice to instances"
653
                    " %s and %s" % (minor, node, instance_a, instance_b))
654

    
655
    # IP checks
656
    default_nicparams = cluster.nicparams[constants.PP_DEFAULT]
657
    ips = {}
658

    
659
    def _AddIpAddress(ip, name):
660
      ips.setdefault(ip, []).append(name)
661

    
662
    _AddIpAddress(cluster.master_ip, "cluster_ip")
663

    
664
    for node in data.nodes.values():
665
      _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
666
      if node.secondary_ip != node.primary_ip:
667
        _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
668

    
669
    for instance in data.instances.values():
670
      for idx, nic in enumerate(instance.nics):
671
        if nic.ip is None:
672
          continue
673

    
674
        nicparams = objects.FillDict(default_nicparams, nic.nicparams)
675
        nic_mode = nicparams[constants.NIC_MODE]
676
        nic_link = nicparams[constants.NIC_LINK]
677

    
678
        if nic_mode == constants.NIC_MODE_BRIDGED:
679
          link = "bridge:%s" % nic_link
680
        elif nic_mode == constants.NIC_MODE_ROUTED:
681
          link = "route:%s" % nic_link
682
        else:
683
          raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
684

    
685
        _AddIpAddress("%s/%s" % (link, nic.ip),
686
                      "instance:%s/nic:%d" % (instance.name, idx))
687

    
688
    for ip, owners in ips.items():
689
      if len(owners) > 1:
690
        result.append("IP address %s is used by multiple owners: %s" %
691
                      (ip, utils.CommaJoin(owners)))
692

    
693
    return result
694

    
695
  @locking.ssynchronized(_config_lock, shared=1)
696
  def VerifyConfig(self):
697
    """Verify function.
698

699
    This is just a wrapper over L{_UnlockedVerifyConfig}.
700

701
    @rtype: list
702
    @return: a list of error messages; a non-empty list signifies
703
        configuration errors
704

705
    """
706
    return self._UnlockedVerifyConfig()
707

    
708
  def _UnlockedSetDiskID(self, disk, node_name):
709
    """Convert the unique ID to the ID needed on the target nodes.
710

711
    This is used only for drbd, which needs ip/port configuration.
712

713
    The routine descends down and updates its children also, because
714
    this helps when the only the top device is passed to the remote
715
    node.
716

717
    This function is for internal use, when the config lock is already held.
718

719
    """
720
    if disk.children:
721
      for child in disk.children:
722
        self._UnlockedSetDiskID(child, node_name)
723

    
724
    if disk.logical_id is None and disk.physical_id is not None:
725
      return
726
    if disk.dev_type == constants.LD_DRBD8:
727
      pnode, snode, port, pminor, sminor, secret = disk.logical_id
728
      if node_name not in (pnode, snode):
729
        raise errors.ConfigurationError("DRBD device not knowing node %s" %
730
                                        node_name)
731
      pnode_info = self._UnlockedGetNodeInfo(pnode)
732
      snode_info = self._UnlockedGetNodeInfo(snode)
733
      if pnode_info is None or snode_info is None:
734
        raise errors.ConfigurationError("Can't find primary or secondary node"
735
                                        " for %s" % str(disk))
736
      p_data = (pnode_info.secondary_ip, port)
737
      s_data = (snode_info.secondary_ip, port)
738
      if pnode == node_name:
739
        disk.physical_id = p_data + s_data + (pminor, secret)
740
      else: # it must be secondary, we tested above
741
        disk.physical_id = s_data + p_data + (sminor, secret)
742
    else:
743
      disk.physical_id = disk.logical_id
744
    return
745

    
746
  @locking.ssynchronized(_config_lock)
747
  def SetDiskID(self, disk, node_name):
748
    """Convert the unique ID to the ID needed on the target nodes.
749

750
    This is used only for drbd, which needs ip/port configuration.
751

752
    The routine descends down and updates its children also, because
753
    this helps when the only the top device is passed to the remote
754
    node.
755

756
    """
757
    return self._UnlockedSetDiskID(disk, node_name)
758

    
759
  @locking.ssynchronized(_config_lock)
760
  def AddTcpUdpPort(self, port):
761
    """Adds a new port to the available port pool.
762

763
    @warning: this method does not "flush" the configuration (via
764
        L{_WriteConfig}); callers should do that themselves once the
765
        configuration is stable
766

767
    """
768
    if not isinstance(port, int):
769
      raise errors.ProgrammerError("Invalid type passed for port")
770

    
771
    self._config_data.cluster.tcpudp_port_pool.add(port)
772

    
773
  @locking.ssynchronized(_config_lock, shared=1)
774
  def GetPortList(self):
775
    """Returns a copy of the current port list.
776

777
    """
778
    return self._config_data.cluster.tcpudp_port_pool.copy()
779

    
780
  @locking.ssynchronized(_config_lock)
781
  def AllocatePort(self):
782
    """Allocate a port.
783

784
    The port will be taken from the available port pool or from the
785
    default port range (and in this case we increase
786
    highest_used_port).
787

788
    """
789
    # If there are TCP/IP ports configured, we use them first.
790
    if self._config_data.cluster.tcpudp_port_pool:
791
      port = self._config_data.cluster.tcpudp_port_pool.pop()
792
    else:
793
      port = self._config_data.cluster.highest_used_port + 1
794
      if port >= constants.LAST_DRBD_PORT:
795
        raise errors.ConfigurationError("The highest used port is greater"
796
                                        " than %s. Aborting." %
797
                                        constants.LAST_DRBD_PORT)
798
      self._config_data.cluster.highest_used_port = port
799

    
800
    self._WriteConfig()
801
    return port
802

    
803
  def _UnlockedComputeDRBDMap(self):
804
    """Compute the used DRBD minor/nodes.
805

806
    @rtype: (dict, list)
807
    @return: dictionary of node_name: dict of minor: instance_name;
808
        the returned dict will have all the nodes in it (even if with
809
        an empty list), and a list of duplicates; if the duplicates
810
        list is not empty, the configuration is corrupted and its caller
811
        should raise an exception
812

813
    """
814
    def _AppendUsedPorts(instance_name, disk, used):
815
      duplicates = []
816
      if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
817
        node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
818
        for node, port in ((node_a, minor_a), (node_b, minor_b)):
819
          assert node in used, ("Node '%s' of instance '%s' not found"
820
                                " in node list" % (node, instance_name))
821
          if port in used[node]:
822
            duplicates.append((node, port, instance_name, used[node][port]))
823
          else:
824
            used[node][port] = instance_name
825
      if disk.children:
826
        for child in disk.children:
827
          duplicates.extend(_AppendUsedPorts(instance_name, child, used))
828
      return duplicates
829

    
830
    duplicates = []
831
    my_dict = dict((node, {}) for node in self._config_data.nodes)
832
    for instance in self._config_data.instances.itervalues():
833
      for disk in instance.disks:
834
        duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
835
    for (node, minor), instance in self._temporary_drbds.iteritems():
836
      if minor in my_dict[node] and my_dict[node][minor] != instance:
837
        duplicates.append((node, minor, instance, my_dict[node][minor]))
838
      else:
839
        my_dict[node][minor] = instance
840
    return my_dict, duplicates
841

    
842
  @locking.ssynchronized(_config_lock)
843
  def ComputeDRBDMap(self):
844
    """Compute the used DRBD minor/nodes.
845

846
    This is just a wrapper over L{_UnlockedComputeDRBDMap}.
847

848
    @return: dictionary of node_name: dict of minor: instance_name;
849
        the returned dict will have all the nodes in it (even if with
850
        an empty list).
851

852
    """
853
    d_map, duplicates = self._UnlockedComputeDRBDMap()
854
    if duplicates:
855
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
856
                                      str(duplicates))
857
    return d_map
858

    
859
  @locking.ssynchronized(_config_lock)
860
  def AllocateDRBDMinor(self, nodes, instance):
861
    """Allocate a drbd minor.
862

863
    The free minor will be automatically computed from the existing
864
    devices. A node can be given multiple times in order to allocate
865
    multiple minors. The result is the list of minors, in the same
866
    order as the passed nodes.
867

868
    @type instance: string
869
    @param instance: the instance for which we allocate minors
870

871
    """
872
    assert isinstance(instance, basestring), \
873
           "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
874

    
875
    d_map, duplicates = self._UnlockedComputeDRBDMap()
876
    if duplicates:
877
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
878
                                      str(duplicates))
879
    result = []
880
    for nname in nodes:
881
      ndata = d_map[nname]
882
      if not ndata:
883
        # no minors used, we can start at 0
884
        result.append(0)
885
        ndata[0] = instance
886
        self._temporary_drbds[(nname, 0)] = instance
887
        continue
888
      keys = ndata.keys()
889
      keys.sort()
890
      ffree = utils.FirstFree(keys)
891
      if ffree is None:
892
        # return the next minor
893
        # TODO: implement high-limit check
894
        minor = keys[-1] + 1
895
      else:
896
        minor = ffree
897
      # double-check minor against current instances
898
      assert minor not in d_map[nname], \
899
             ("Attempt to reuse allocated DRBD minor %d on node %s,"
900
              " already allocated to instance %s" %
901
              (minor, nname, d_map[nname][minor]))
902
      ndata[minor] = instance
903
      # double-check minor against reservation
904
      r_key = (nname, minor)
905
      assert r_key not in self._temporary_drbds, \
906
             ("Attempt to reuse reserved DRBD minor %d on node %s,"
907
              " reserved for instance %s" %
908
              (minor, nname, self._temporary_drbds[r_key]))
909
      self._temporary_drbds[r_key] = instance
910
      result.append(minor)
911
    logging.debug("Request to allocate drbd minors, input: %s, returning %s",
912
                  nodes, result)
913
    return result
914

    
915
  def _UnlockedReleaseDRBDMinors(self, instance):
916
    """Release temporary drbd minors allocated for a given instance.
917

918
    @type instance: string
919
    @param instance: the instance for which temporary minors should be
920
                     released
921

922
    """
923
    assert isinstance(instance, basestring), \
924
           "Invalid argument passed to ReleaseDRBDMinors"
925
    for key, name in self._temporary_drbds.items():
926
      if name == instance:
927
        del self._temporary_drbds[key]
928

    
929
  @locking.ssynchronized(_config_lock)
930
  def ReleaseDRBDMinors(self, instance):
931
    """Release temporary drbd minors allocated for a given instance.
932

933
    This should be called on the error paths, on the success paths
934
    it's automatically called by the ConfigWriter add and update
935
    functions.
936

937
    This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
938

939
    @type instance: string
940
    @param instance: the instance for which temporary minors should be
941
                     released
942

943
    """
944
    self._UnlockedReleaseDRBDMinors(instance)
945

    
946
  @locking.ssynchronized(_config_lock, shared=1)
947
  def GetConfigVersion(self):
948
    """Get the configuration version.
949

950
    @return: Config version
951

952
    """
953
    return self._config_data.version
954

    
955
  @locking.ssynchronized(_config_lock, shared=1)
956
  def GetClusterName(self):
957
    """Get cluster name.
958

959
    @return: Cluster name
960

961
    """
962
    return self._config_data.cluster.cluster_name
963

    
964
  @locking.ssynchronized(_config_lock, shared=1)
965
  def GetMasterNode(self):
966
    """Get the hostname of the master node for this cluster.
967

968
    @return: Master hostname
969

970
    """
971
    return self._config_data.cluster.master_node
972

    
973
  @locking.ssynchronized(_config_lock, shared=1)
974
  def GetMasterIP(self):
975
    """Get the IP of the master node for this cluster.
976

977
    @return: Master IP
978

979
    """
980
    return self._config_data.cluster.master_ip
981

    
982
  @locking.ssynchronized(_config_lock, shared=1)
983
  def GetMasterNetdev(self):
984
    """Get the master network device for this cluster.
985

986
    """
987
    return self._config_data.cluster.master_netdev
988

    
989
  @locking.ssynchronized(_config_lock, shared=1)
990
  def GetMasterNetmask(self):
991
    """Get the netmask of the master node for this cluster.
992

993
    """
994
    return self._config_data.cluster.master_netmask
995

    
996
  @locking.ssynchronized(_config_lock, shared=1)
997
  def GetUseExternalMipScript(self):
998
    """Get flag representing whether to use the external master IP setup script.
999

1000
    """
1001
    return self._config_data.cluster.use_external_mip_script
1002

    
1003
  @locking.ssynchronized(_config_lock, shared=1)
1004
  def GetFileStorageDir(self):
1005
    """Get the file storage dir for this cluster.
1006

1007
    """
1008
    return self._config_data.cluster.file_storage_dir
1009

    
1010
  @locking.ssynchronized(_config_lock, shared=1)
1011
  def GetSharedFileStorageDir(self):
1012
    """Get the shared file storage dir for this cluster.
1013

1014
    """
1015
    return self._config_data.cluster.shared_file_storage_dir
1016

    
1017
  @locking.ssynchronized(_config_lock, shared=1)
1018
  def GetHypervisorType(self):
1019
    """Get the hypervisor type for this cluster.
1020

1021
    """
1022
    return self._config_data.cluster.enabled_hypervisors[0]
1023

    
1024
  @locking.ssynchronized(_config_lock, shared=1)
1025
  def GetHostKey(self):
1026
    """Return the rsa hostkey from the config.
1027

1028
    @rtype: string
1029
    @return: the rsa hostkey
1030

1031
    """
1032
    return self._config_data.cluster.rsahostkeypub
1033

    
1034
  @locking.ssynchronized(_config_lock, shared=1)
1035
  def GetDefaultIAllocator(self):
1036
    """Get the default instance allocator for this cluster.
1037

1038
    """
1039
    return self._config_data.cluster.default_iallocator
1040

    
1041
  @locking.ssynchronized(_config_lock, shared=1)
1042
  def GetPrimaryIPFamily(self):
1043
    """Get cluster primary ip family.
1044

1045
    @return: primary ip family
1046

1047
    """
1048
    return self._config_data.cluster.primary_ip_family
1049

    
1050
  @locking.ssynchronized(_config_lock, shared=1)
1051
  def GetMasterNetworkParameters(self):
1052
    """Get network parameters of the master node.
1053

1054
    @rtype: L{object.MasterNetworkParameters}
1055
    @return: network parameters of the master node
1056

1057
    """
1058
    cluster = self._config_data.cluster
1059
    result = objects.MasterNetworkParameters(
1060
      name=cluster.master_node, ip=cluster.master_ip,
1061
      netmask=cluster.master_netmask, netdev=cluster.master_netdev,
1062
      ip_family=cluster.primary_ip_family)
1063

    
1064
    return result
1065

    
1066
  @locking.ssynchronized(_config_lock)
1067
  def AddNodeGroup(self, group, ec_id, check_uuid=True):
1068
    """Add a node group to the configuration.
1069

1070
    This method calls group.UpgradeConfig() to fill any missing attributes
1071
    according to their default values.
1072

1073
    @type group: L{objects.NodeGroup}
1074
    @param group: the NodeGroup object to add
1075
    @type ec_id: string
1076
    @param ec_id: unique id for the job to use when creating a missing UUID
1077
    @type check_uuid: bool
1078
    @param check_uuid: add an UUID to the group if it doesn't have one or, if
1079
                       it does, ensure that it does not exist in the
1080
                       configuration already
1081

1082
    """
1083
    self._UnlockedAddNodeGroup(group, ec_id, check_uuid)
1084
    self._WriteConfig()
1085

    
1086
  def _UnlockedAddNodeGroup(self, group, ec_id, check_uuid):
1087
    """Add a node group to the configuration.
1088

1089
    """
1090
    logging.info("Adding node group %s to configuration", group.name)
1091

    
1092
    # Some code might need to add a node group with a pre-populated UUID
1093
    # generated with ConfigWriter.GenerateUniqueID(). We allow them to bypass
1094
    # the "does this UUID" exist already check.
1095
    if check_uuid:
1096
      self._EnsureUUID(group, ec_id)
1097

    
1098
    try:
1099
      existing_uuid = self._UnlockedLookupNodeGroup(group.name)
1100
    except errors.OpPrereqError:
1101
      pass
1102
    else:
1103
      raise errors.OpPrereqError("Desired group name '%s' already exists as a"
1104
                                 " node group (UUID: %s)" %
1105
                                 (group.name, existing_uuid),
1106
                                 errors.ECODE_EXISTS)
1107

    
1108
    group.serial_no = 1
1109
    group.ctime = group.mtime = time.time()
1110
    group.UpgradeConfig()
1111

    
1112
    self._config_data.nodegroups[group.uuid] = group
1113
    self._config_data.cluster.serial_no += 1
1114

    
1115
  @locking.ssynchronized(_config_lock)
1116
  def RemoveNodeGroup(self, group_uuid):
1117
    """Remove a node group from the configuration.
1118

1119
    @type group_uuid: string
1120
    @param group_uuid: the UUID of the node group to remove
1121

1122
    """
1123
    logging.info("Removing node group %s from configuration", group_uuid)
1124

    
1125
    if group_uuid not in self._config_data.nodegroups:
1126
      raise errors.ConfigurationError("Unknown node group '%s'" % group_uuid)
1127

    
1128
    assert len(self._config_data.nodegroups) != 1, \
1129
            "Group '%s' is the only group, cannot be removed" % group_uuid
1130

    
1131
    del self._config_data.nodegroups[group_uuid]
1132
    self._config_data.cluster.serial_no += 1
1133
    self._WriteConfig()
1134

    
1135
  def _UnlockedLookupNodeGroup(self, target):
1136
    """Lookup a node group's UUID.
1137

1138
    @type target: string or None
1139
    @param target: group name or UUID or None to look for the default
1140
    @rtype: string
1141
    @return: nodegroup UUID
1142
    @raises errors.OpPrereqError: when the target group cannot be found
1143

1144
    """
1145
    if target is None:
1146
      if len(self._config_data.nodegroups) != 1:
1147
        raise errors.OpPrereqError("More than one node group exists. Target"
1148
                                   " group must be specified explicitly.")
1149
      else:
1150
        return self._config_data.nodegroups.keys()[0]
1151
    if target in self._config_data.nodegroups:
1152
      return target
1153
    for nodegroup in self._config_data.nodegroups.values():
1154
      if nodegroup.name == target:
1155
        return nodegroup.uuid
1156
    raise errors.OpPrereqError("Node group '%s' not found" % target,
1157
                               errors.ECODE_NOENT)
1158

    
1159
  @locking.ssynchronized(_config_lock, shared=1)
1160
  def LookupNodeGroup(self, target):
1161
    """Lookup a node group's UUID.
1162

1163
    This function is just a wrapper over L{_UnlockedLookupNodeGroup}.
1164

1165
    @type target: string or None
1166
    @param target: group name or UUID or None to look for the default
1167
    @rtype: string
1168
    @return: nodegroup UUID
1169

1170
    """
1171
    return self._UnlockedLookupNodeGroup(target)
1172

    
1173
  def _UnlockedGetNodeGroup(self, uuid):
1174
    """Lookup a node group.
1175

1176
    @type uuid: string
1177
    @param uuid: group UUID
1178
    @rtype: L{objects.NodeGroup} or None
1179
    @return: nodegroup object, or None if not found
1180

1181
    """
1182
    if uuid not in self._config_data.nodegroups:
1183
      return None
1184

    
1185
    return self._config_data.nodegroups[uuid]
1186

    
1187
  @locking.ssynchronized(_config_lock, shared=1)
1188
  def GetNodeGroup(self, uuid):
1189
    """Lookup a node group.
1190

1191
    @type uuid: string
1192
    @param uuid: group UUID
1193
    @rtype: L{objects.NodeGroup} or None
1194
    @return: nodegroup object, or None if not found
1195

1196
    """
1197
    return self._UnlockedGetNodeGroup(uuid)
1198

    
1199
  @locking.ssynchronized(_config_lock, shared=1)
1200
  def GetAllNodeGroupsInfo(self):
1201
    """Get the configuration of all node groups.
1202

1203
    """
1204
    return dict(self._config_data.nodegroups)
1205

    
1206
  @locking.ssynchronized(_config_lock, shared=1)
1207
  def GetNodeGroupList(self):
1208
    """Get a list of node groups.
1209

1210
    """
1211
    return self._config_data.nodegroups.keys()
1212

    
1213
  @locking.ssynchronized(_config_lock, shared=1)
1214
  def GetNodeGroupMembersByNodes(self, nodes):
1215
    """Get nodes which are member in the same nodegroups as the given nodes.
1216

1217
    """
1218
    ngfn = lambda node_name: self._UnlockedGetNodeInfo(node_name).group
1219
    return frozenset(member_name
1220
                     for node_name in nodes
1221
                     for member_name in
1222
                       self._UnlockedGetNodeGroup(ngfn(node_name)).members)
1223

    
1224
  @locking.ssynchronized(_config_lock, shared=1)
1225
  def GetMultiNodeGroupInfo(self, group_uuids):
1226
    """Get the configuration of multiple node groups.
1227

1228
    @param group_uuids: List of node group UUIDs
1229
    @rtype: list
1230
    @return: List of tuples of (group_uuid, group_info)
1231

1232
    """
1233
    return [(uuid, self._UnlockedGetNodeGroup(uuid)) for uuid in group_uuids]
1234

    
1235
  @locking.ssynchronized(_config_lock)
1236
  def AddInstance(self, instance, ec_id):
1237
    """Add an instance to the config.
1238

1239
    This should be used after creating a new instance.
1240

1241
    @type instance: L{objects.Instance}
1242
    @param instance: the instance object
1243

1244
    """
1245
    if not isinstance(instance, objects.Instance):
1246
      raise errors.ProgrammerError("Invalid type passed to AddInstance")
1247

    
1248
    if instance.disk_template != constants.DT_DISKLESS:
1249
      all_lvs = instance.MapLVsByNode()
1250
      logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
1251

    
1252
    all_macs = self._AllMACs()
1253
    for nic in instance.nics:
1254
      if nic.mac in all_macs:
1255
        raise errors.ConfigurationError("Cannot add instance %s:"
1256
                                        " MAC address '%s' already in use." %
1257
                                        (instance.name, nic.mac))
1258

    
1259
    self._EnsureUUID(instance, ec_id)
1260

    
1261
    instance.serial_no = 1
1262
    instance.ctime = instance.mtime = time.time()
1263
    self._config_data.instances[instance.name] = instance
1264
    self._config_data.cluster.serial_no += 1
1265
    self._UnlockedReleaseDRBDMinors(instance.name)
1266
    self._WriteConfig()
1267

    
1268
  def _EnsureUUID(self, item, ec_id):
1269
    """Ensures a given object has a valid UUID.
1270

1271
    @param item: the instance or node to be checked
1272
    @param ec_id: the execution context id for the uuid reservation
1273

1274
    """
1275
    if not item.uuid:
1276
      item.uuid = self._GenerateUniqueID(ec_id)
1277
    elif item.uuid in self._AllIDs(include_temporary=True):
1278
      raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
1279
                                      " in use" % (item.name, item.uuid))
1280

    
1281
  def _SetInstanceStatus(self, instance_name, status):
1282
    """Set the instance's status to a given value.
1283

1284
    """
1285
    assert status in constants.ADMINST_ALL, \
1286
           "Invalid status '%s' passed to SetInstanceStatus" % (status,)
1287

    
1288
    if instance_name not in self._config_data.instances:
1289
      raise errors.ConfigurationError("Unknown instance '%s'" %
1290
                                      instance_name)
1291
    instance = self._config_data.instances[instance_name]
1292
    if instance.admin_state != status:
1293
      instance.admin_state = status
1294
      instance.serial_no += 1
1295
      instance.mtime = time.time()
1296
      self._WriteConfig()
1297

    
1298
  @locking.ssynchronized(_config_lock)
1299
  def MarkInstanceUp(self, instance_name):
1300
    """Mark the instance status to up in the config.
1301

1302
    """
1303
    self._SetInstanceStatus(instance_name, constants.ADMINST_UP)
1304

    
1305
  @locking.ssynchronized(_config_lock)
1306
  def MarkInstanceOffline(self, instance_name):
1307
    """Mark the instance status to down in the config.
1308

1309
    """
1310
    self._SetInstanceStatus(instance_name, constants.ADMINST_OFFLINE)
1311

    
1312
  @locking.ssynchronized(_config_lock)
1313
  def RemoveInstance(self, instance_name):
1314
    """Remove the instance from the configuration.
1315

1316
    """
1317
    if instance_name not in self._config_data.instances:
1318
      raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
1319

    
1320
    # If a network port has been allocated to the instance,
1321
    # return it to the pool of free ports.
1322
    inst = self._config_data.instances[instance_name]
1323
    network_port = getattr(inst, "network_port", None)
1324
    if network_port is not None:
1325
      self._config_data.cluster.tcpudp_port_pool.add(network_port)
1326

    
1327
    del self._config_data.instances[instance_name]
1328
    self._config_data.cluster.serial_no += 1
1329
    self._WriteConfig()
1330

    
1331
  @locking.ssynchronized(_config_lock)
1332
  def RenameInstance(self, old_name, new_name):
1333
    """Rename an instance.
1334

1335
    This needs to be done in ConfigWriter and not by RemoveInstance
1336
    combined with AddInstance as only we can guarantee an atomic
1337
    rename.
1338

1339
    """
1340
    if old_name not in self._config_data.instances:
1341
      raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
1342

    
1343
    # Operate on a copy to not loose instance object in case of a failure
1344
    inst = self._config_data.instances[old_name].Copy()
1345
    inst.name = new_name
1346

    
1347
    for (idx, disk) in enumerate(inst.disks):
1348
      if disk.dev_type == constants.LD_FILE:
1349
        # rename the file paths in logical and physical id
1350
        file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
1351
        disk.logical_id = (disk.logical_id[0],
1352
                           utils.PathJoin(file_storage_dir, inst.name,
1353
                                          "disk%s" % idx))
1354
        disk.physical_id = disk.logical_id
1355

    
1356
    # Actually replace instance object
1357
    del self._config_data.instances[old_name]
1358
    self._config_data.instances[inst.name] = inst
1359

    
1360
    # Force update of ssconf files
1361
    self._config_data.cluster.serial_no += 1
1362

    
1363
    self._WriteConfig()
1364

    
1365
  @locking.ssynchronized(_config_lock)
1366
  def MarkInstanceDown(self, instance_name):
1367
    """Mark the status of an instance to down in the configuration.
1368

1369
    """
1370
    self._SetInstanceStatus(instance_name, constants.ADMINST_DOWN)
1371

    
1372
  def _UnlockedGetInstanceList(self):
1373
    """Get the list of instances.
1374

1375
    This function is for internal use, when the config lock is already held.
1376

1377
    """
1378
    return self._config_data.instances.keys()
1379

    
1380
  @locking.ssynchronized(_config_lock, shared=1)
1381
  def GetInstanceList(self):
1382
    """Get the list of instances.
1383

1384
    @return: array of instances, ex. ['instance2.example.com',
1385
        'instance1.example.com']
1386

1387
    """
1388
    return self._UnlockedGetInstanceList()
1389

    
1390
  def ExpandInstanceName(self, short_name):
1391
    """Attempt to expand an incomplete instance name.
1392

1393
    """
1394
    # Locking is done in L{ConfigWriter.GetInstanceList}
1395
    return _MatchNameComponentIgnoreCase(short_name, self.GetInstanceList())
1396

    
1397
  def _UnlockedGetInstanceInfo(self, instance_name):
1398
    """Returns information about an instance.
1399

1400
    This function is for internal use, when the config lock is already held.
1401

1402
    """
1403
    if instance_name not in self._config_data.instances:
1404
      return None
1405

    
1406
    return self._config_data.instances[instance_name]
1407

    
1408
  @locking.ssynchronized(_config_lock, shared=1)
1409
  def GetInstanceInfo(self, instance_name):
1410
    """Returns information about an instance.
1411

1412
    It takes the information from the configuration file. Other information of
1413
    an instance are taken from the live systems.
1414

1415
    @param instance_name: name of the instance, e.g.
1416
        I{instance1.example.com}
1417

1418
    @rtype: L{objects.Instance}
1419
    @return: the instance object
1420

1421
    """
1422
    return self._UnlockedGetInstanceInfo(instance_name)
1423

    
1424
  @locking.ssynchronized(_config_lock, shared=1)
1425
  def GetInstanceNodeGroups(self, instance_name, primary_only=False):
1426
    """Returns set of node group UUIDs for instance's nodes.
1427

1428
    @rtype: frozenset
1429

1430
    """
1431
    instance = self._UnlockedGetInstanceInfo(instance_name)
1432
    if not instance:
1433
      raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
1434

    
1435
    if primary_only:
1436
      nodes = [instance.primary_node]
1437
    else:
1438
      nodes = instance.all_nodes
1439

    
1440
    return frozenset(self._UnlockedGetNodeInfo(node_name).group
1441
                     for node_name in nodes)
1442

    
1443
  @locking.ssynchronized(_config_lock, shared=1)
1444
  def GetMultiInstanceInfo(self, instances):
1445
    """Get the configuration of multiple instances.
1446

1447
    @param instances: list of instance names
1448
    @rtype: list
1449
    @return: list of tuples (instance, instance_info), where
1450
        instance_info is what would GetInstanceInfo return for the
1451
        node, while keeping the original order
1452

1453
    """
1454
    return [(name, self._UnlockedGetInstanceInfo(name)) for name in instances]
1455

    
1456
  @locking.ssynchronized(_config_lock, shared=1)
1457
  def GetAllInstancesInfo(self):
1458
    """Get the configuration of all instances.
1459

1460
    @rtype: dict
1461
    @return: dict of (instance, instance_info), where instance_info is what
1462
              would GetInstanceInfo return for the node
1463

1464
    """
1465
    my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
1466
                    for instance in self._UnlockedGetInstanceList()])
1467
    return my_dict
1468

    
1469
  @locking.ssynchronized(_config_lock, shared=1)
1470
  def GetInstancesInfoByFilter(self, filter_fn):
1471
    """Get instance configuration with a filter.
1472

1473
    @type filter_fn: callable
1474
    @param filter_fn: Filter function receiving instance object as parameter,
1475
      returning boolean. Important: this function is called while the
1476
      configuration locks is held. It must not do any complex work or call
1477
      functions potentially leading to a deadlock. Ideally it doesn't call any
1478
      other functions and just compares instance attributes.
1479

1480
    """
1481
    return dict((name, inst)
1482
                for (name, inst) in self._config_data.instances.items()
1483
                if filter_fn(inst))
1484

    
1485
  @locking.ssynchronized(_config_lock)
1486
  def AddNode(self, node, ec_id):
1487
    """Add a node to the configuration.
1488

1489
    @type node: L{objects.Node}
1490
    @param node: a Node instance
1491

1492
    """
1493
    logging.info("Adding node %s to configuration", node.name)
1494

    
1495
    self._EnsureUUID(node, ec_id)
1496

    
1497
    node.serial_no = 1
1498
    node.ctime = node.mtime = time.time()
1499
    self._UnlockedAddNodeToGroup(node.name, node.group)
1500
    self._config_data.nodes[node.name] = node
1501
    self._config_data.cluster.serial_no += 1
1502
    self._WriteConfig()
1503

    
1504
  @locking.ssynchronized(_config_lock)
1505
  def RemoveNode(self, node_name):
1506
    """Remove a node from the configuration.
1507

1508
    """
1509
    logging.info("Removing node %s from configuration", node_name)
1510

    
1511
    if node_name not in self._config_data.nodes:
1512
      raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1513

    
1514
    self._UnlockedRemoveNodeFromGroup(self._config_data.nodes[node_name])
1515
    del self._config_data.nodes[node_name]
1516
    self._config_data.cluster.serial_no += 1
1517
    self._WriteConfig()
1518

    
1519
  def ExpandNodeName(self, short_name):
1520
    """Attempt to expand an incomplete node name.
1521

1522
    """
1523
    # Locking is done in L{ConfigWriter.GetNodeList}
1524
    return _MatchNameComponentIgnoreCase(short_name, self.GetNodeList())
1525

    
1526
  def _UnlockedGetNodeInfo(self, node_name):
1527
    """Get the configuration of a node, as stored in the config.
1528

1529
    This function is for internal use, when the config lock is already
1530
    held.
1531

1532
    @param node_name: the node name, e.g. I{node1.example.com}
1533

1534
    @rtype: L{objects.Node}
1535
    @return: the node object
1536

1537
    """
1538
    if node_name not in self._config_data.nodes:
1539
      return None
1540

    
1541
    return self._config_data.nodes[node_name]
1542

    
1543
  @locking.ssynchronized(_config_lock, shared=1)
1544
  def GetNodeInfo(self, node_name):
1545
    """Get the configuration of a node, as stored in the config.
1546

1547
    This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1548

1549
    @param node_name: the node name, e.g. I{node1.example.com}
1550

1551
    @rtype: L{objects.Node}
1552
    @return: the node object
1553

1554
    """
1555
    return self._UnlockedGetNodeInfo(node_name)
1556

    
1557
  @locking.ssynchronized(_config_lock, shared=1)
1558
  def GetNodeInstances(self, node_name):
1559
    """Get the instances of a node, as stored in the config.
1560

1561
    @param node_name: the node name, e.g. I{node1.example.com}
1562

1563
    @rtype: (list, list)
1564
    @return: a tuple with two lists: the primary and the secondary instances
1565

1566
    """
1567
    pri = []
1568
    sec = []
1569
    for inst in self._config_data.instances.values():
1570
      if inst.primary_node == node_name:
1571
        pri.append(inst.name)
1572
      if node_name in inst.secondary_nodes:
1573
        sec.append(inst.name)
1574
    return (pri, sec)
1575

    
1576
  @locking.ssynchronized(_config_lock, shared=1)
1577
  def GetNodeGroupInstances(self, uuid, primary_only=False):
1578
    """Get the instances of a node group.
1579

1580
    @param uuid: Node group UUID
1581
    @param primary_only: Whether to only consider primary nodes
1582
    @rtype: frozenset
1583
    @return: List of instance names in node group
1584

1585
    """
1586
    if primary_only:
1587
      nodes_fn = lambda inst: [inst.primary_node]
1588
    else:
1589
      nodes_fn = lambda inst: inst.all_nodes
1590

    
1591
    return frozenset(inst.name
1592
                     for inst in self._config_data.instances.values()
1593
                     for node_name in nodes_fn(inst)
1594
                     if self._UnlockedGetNodeInfo(node_name).group == uuid)
1595

    
1596
  def _UnlockedGetNodeList(self):
1597
    """Return the list of nodes which are in the configuration.
1598

1599
    This function is for internal use, when the config lock is already
1600
    held.
1601

1602
    @rtype: list
1603

1604
    """
1605
    return self._config_data.nodes.keys()
1606

    
1607
  @locking.ssynchronized(_config_lock, shared=1)
1608
  def GetNodeList(self):
1609
    """Return the list of nodes which are in the configuration.
1610

1611
    """
1612
    return self._UnlockedGetNodeList()
1613

    
1614
  def _UnlockedGetOnlineNodeList(self):
1615
    """Return the list of nodes which are online.
1616

1617
    """
1618
    all_nodes = [self._UnlockedGetNodeInfo(node)
1619
                 for node in self._UnlockedGetNodeList()]
1620
    return [node.name for node in all_nodes if not node.offline]
1621

    
1622
  @locking.ssynchronized(_config_lock, shared=1)
1623
  def GetOnlineNodeList(self):
1624
    """Return the list of nodes which are online.
1625

1626
    """
1627
    return self._UnlockedGetOnlineNodeList()
1628

    
1629
  @locking.ssynchronized(_config_lock, shared=1)
1630
  def GetVmCapableNodeList(self):
1631
    """Return the list of nodes which are not vm capable.
1632

1633
    """
1634
    all_nodes = [self._UnlockedGetNodeInfo(node)
1635
                 for node in self._UnlockedGetNodeList()]
1636
    return [node.name for node in all_nodes if node.vm_capable]
1637

    
1638
  @locking.ssynchronized(_config_lock, shared=1)
1639
  def GetNonVmCapableNodeList(self):
1640
    """Return the list of nodes which are not vm capable.
1641

1642
    """
1643
    all_nodes = [self._UnlockedGetNodeInfo(node)
1644
                 for node in self._UnlockedGetNodeList()]
1645
    return [node.name for node in all_nodes if not node.vm_capable]
1646

    
1647
  @locking.ssynchronized(_config_lock, shared=1)
1648
  def GetMultiNodeInfo(self, nodes):
1649
    """Get the configuration of multiple nodes.
1650

1651
    @param nodes: list of node names
1652
    @rtype: list
1653
    @return: list of tuples of (node, node_info), where node_info is
1654
        what would GetNodeInfo return for the node, in the original
1655
        order
1656

1657
    """
1658
    return [(name, self._UnlockedGetNodeInfo(name)) for name in nodes]
1659

    
1660
  @locking.ssynchronized(_config_lock, shared=1)
1661
  def GetAllNodesInfo(self):
1662
    """Get the configuration of all nodes.
1663

1664
    @rtype: dict
1665
    @return: dict of (node, node_info), where node_info is what
1666
              would GetNodeInfo return for the node
1667

1668
    """
1669
    return self._UnlockedGetAllNodesInfo()
1670

    
1671
  def _UnlockedGetAllNodesInfo(self):
1672
    """Gets configuration of all nodes.
1673

1674
    @note: See L{GetAllNodesInfo}
1675

1676
    """
1677
    return dict([(node, self._UnlockedGetNodeInfo(node))
1678
                 for node in self._UnlockedGetNodeList()])
1679

    
1680
  @locking.ssynchronized(_config_lock, shared=1)
1681
  def GetNodeGroupsFromNodes(self, nodes):
1682
    """Returns groups for a list of nodes.
1683

1684
    @type nodes: list of string
1685
    @param nodes: List of node names
1686
    @rtype: frozenset
1687

1688
    """
1689
    return frozenset(self._UnlockedGetNodeInfo(name).group for name in nodes)
1690

    
1691
  def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1692
    """Get the number of current and maximum desired and possible candidates.
1693

1694
    @type exceptions: list
1695
    @param exceptions: if passed, list of nodes that should be ignored
1696
    @rtype: tuple
1697
    @return: tuple of (current, desired and possible, possible)
1698

1699
    """
1700
    mc_now = mc_should = mc_max = 0
1701
    for node in self._config_data.nodes.values():
1702
      if exceptions and node.name in exceptions:
1703
        continue
1704
      if not (node.offline or node.drained) and node.master_capable:
1705
        mc_max += 1
1706
      if node.master_candidate:
1707
        mc_now += 1
1708
    mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1709
    return (mc_now, mc_should, mc_max)
1710

    
1711
  @locking.ssynchronized(_config_lock, shared=1)
1712
  def GetMasterCandidateStats(self, exceptions=None):
1713
    """Get the number of current and maximum possible candidates.
1714

1715
    This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1716

1717
    @type exceptions: list
1718
    @param exceptions: if passed, list of nodes that should be ignored
1719
    @rtype: tuple
1720
    @return: tuple of (current, max)
1721

1722
    """
1723
    return self._UnlockedGetMasterCandidateStats(exceptions)
1724

    
1725
  @locking.ssynchronized(_config_lock)
1726
  def MaintainCandidatePool(self, exceptions):
1727
    """Try to grow the candidate pool to the desired size.
1728

1729
    @type exceptions: list
1730
    @param exceptions: if passed, list of nodes that should be ignored
1731
    @rtype: list
1732
    @return: list with the adjusted nodes (L{objects.Node} instances)
1733

1734
    """
1735
    mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1736
    mod_list = []
1737
    if mc_now < mc_max:
1738
      node_list = self._config_data.nodes.keys()
1739
      random.shuffle(node_list)
1740
      for name in node_list:
1741
        if mc_now >= mc_max:
1742
          break
1743
        node = self._config_data.nodes[name]
1744
        if (node.master_candidate or node.offline or node.drained or
1745
            node.name in exceptions or not node.master_capable):
1746
          continue
1747
        mod_list.append(node)
1748
        node.master_candidate = True
1749
        node.serial_no += 1
1750
        mc_now += 1
1751
      if mc_now != mc_max:
1752
        # this should not happen
1753
        logging.warning("Warning: MaintainCandidatePool didn't manage to"
1754
                        " fill the candidate pool (%d/%d)", mc_now, mc_max)
1755
      if mod_list:
1756
        self._config_data.cluster.serial_no += 1
1757
        self._WriteConfig()
1758

    
1759
    return mod_list
1760

    
1761
  def _UnlockedAddNodeToGroup(self, node_name, nodegroup_uuid):
1762
    """Add a given node to the specified group.
1763

1764
    """
1765
    if nodegroup_uuid not in self._config_data.nodegroups:
1766
      # This can happen if a node group gets deleted between its lookup and
1767
      # when we're adding the first node to it, since we don't keep a lock in
1768
      # the meantime. It's ok though, as we'll fail cleanly if the node group
1769
      # is not found anymore.
1770
      raise errors.OpExecError("Unknown node group: %s" % nodegroup_uuid)
1771
    if node_name not in self._config_data.nodegroups[nodegroup_uuid].members:
1772
      self._config_data.nodegroups[nodegroup_uuid].members.append(node_name)
1773

    
1774
  def _UnlockedRemoveNodeFromGroup(self, node):
1775
    """Remove a given node from its group.
1776

1777
    """
1778
    nodegroup = node.group
1779
    if nodegroup not in self._config_data.nodegroups:
1780
      logging.warning("Warning: node '%s' has unknown node group '%s'"
1781
                      " (while being removed from it)", node.name, nodegroup)
1782
    nodegroup_obj = self._config_data.nodegroups[nodegroup]
1783
    if node.name not in nodegroup_obj.members:
1784
      logging.warning("Warning: node '%s' not a member of its node group '%s'"
1785
                      " (while being removed from it)", node.name, nodegroup)
1786
    else:
1787
      nodegroup_obj.members.remove(node.name)
1788

    
1789
  @locking.ssynchronized(_config_lock)
1790
  def AssignGroupNodes(self, mods):
1791
    """Changes the group of a number of nodes.
1792

1793
    @type mods: list of tuples; (node name, new group UUID)
1794
    @param mods: Node membership modifications
1795

1796
    """
1797
    groups = self._config_data.nodegroups
1798
    nodes = self._config_data.nodes
1799

    
1800
    resmod = []
1801

    
1802
    # Try to resolve names/UUIDs first
1803
    for (node_name, new_group_uuid) in mods:
1804
      try:
1805
        node = nodes[node_name]
1806
      except KeyError:
1807
        raise errors.ConfigurationError("Unable to find node '%s'" % node_name)
1808

    
1809
      if node.group == new_group_uuid:
1810
        # Node is being assigned to its current group
1811
        logging.debug("Node '%s' was assigned to its current group (%s)",
1812
                      node_name, node.group)
1813
        continue
1814

    
1815
      # Try to find current group of node
1816
      try:
1817
        old_group = groups[node.group]
1818
      except KeyError:
1819
        raise errors.ConfigurationError("Unable to find old group '%s'" %
1820
                                        node.group)
1821

    
1822
      # Try to find new group for node
1823
      try:
1824
        new_group = groups[new_group_uuid]
1825
      except KeyError:
1826
        raise errors.ConfigurationError("Unable to find new group '%s'" %
1827
                                        new_group_uuid)
1828

    
1829
      assert node.name in old_group.members, \
1830
        ("Inconsistent configuration: node '%s' not listed in members for its"
1831
         " old group '%s'" % (node.name, old_group.uuid))
1832
      assert node.name not in new_group.members, \
1833
        ("Inconsistent configuration: node '%s' already listed in members for"
1834
         " its new group '%s'" % (node.name, new_group.uuid))
1835

    
1836
      resmod.append((node, old_group, new_group))
1837

    
1838
    # Apply changes
1839
    for (node, old_group, new_group) in resmod:
1840
      assert node.uuid != new_group.uuid and old_group.uuid != new_group.uuid, \
1841
        "Assigning to current group is not possible"
1842

    
1843
      node.group = new_group.uuid
1844

    
1845
      # Update members of involved groups
1846
      if node.name in old_group.members:
1847
        old_group.members.remove(node.name)
1848
      if node.name not in new_group.members:
1849
        new_group.members.append(node.name)
1850

    
1851
    # Update timestamps and serials (only once per node/group object)
1852
    now = time.time()
1853
    for obj in frozenset(itertools.chain(*resmod)): # pylint: disable=W0142
1854
      obj.serial_no += 1
1855
      obj.mtime = now
1856

    
1857
    # Force ssconf update
1858
    self._config_data.cluster.serial_no += 1
1859

    
1860
    self._WriteConfig()
1861

    
1862
  def _BumpSerialNo(self):
1863
    """Bump up the serial number of the config.
1864

1865
    """
1866
    self._config_data.serial_no += 1
1867
    self._config_data.mtime = time.time()
1868

    
1869
  def _AllUUIDObjects(self):
1870
    """Returns all objects with uuid attributes.
1871

1872
    """
1873
    return (self._config_data.instances.values() +
1874
            self._config_data.nodes.values() +
1875
            self._config_data.nodegroups.values() +
1876
            [self._config_data.cluster])
1877

    
1878
  def _OpenConfig(self, accept_foreign):
1879
    """Read the config data from disk.
1880

1881
    """
1882
    raw_data = utils.ReadFile(self._cfg_file)
1883

    
1884
    try:
1885
      data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1886
    except Exception, err:
1887
      raise errors.ConfigurationError(err)
1888

    
1889
    # Make sure the configuration has the right version
1890
    _ValidateConfig(data)
1891

    
1892
    if (not hasattr(data, "cluster") or
1893
        not hasattr(data.cluster, "rsahostkeypub")):
1894
      raise errors.ConfigurationError("Incomplete configuration"
1895
                                      " (missing cluster.rsahostkeypub)")
1896

    
1897
    if data.cluster.master_node != self._my_hostname and not accept_foreign:
1898
      msg = ("The configuration denotes node %s as master, while my"
1899
             " hostname is %s; opening a foreign configuration is only"
1900
             " possible in accept_foreign mode" %
1901
             (data.cluster.master_node, self._my_hostname))
1902
      raise errors.ConfigurationError(msg)
1903

    
1904
    # Upgrade configuration if needed
1905
    data.UpgradeConfig()
1906

    
1907
    self._config_data = data
1908
    # reset the last serial as -1 so that the next write will cause
1909
    # ssconf update
1910
    self._last_cluster_serial = -1
1911

    
1912
    # And finally run our (custom) config upgrade sequence
1913
    self._UpgradeConfig()
1914

    
1915
    self._cfg_id = utils.GetFileID(path=self._cfg_file)
1916

    
1917
  def _UpgradeConfig(self):
1918
    """Run upgrade steps that cannot be done purely in the objects.
1919

1920
    This is because some data elements need uniqueness across the
1921
    whole configuration, etc.
1922

1923
    @warning: this function will call L{_WriteConfig()}, but also
1924
        L{DropECReservations} so it needs to be called only from a
1925
        "safe" place (the constructor). If one wanted to call it with
1926
        the lock held, a DropECReservationUnlocked would need to be
1927
        created first, to avoid causing deadlock.
1928

1929
    """
1930
    modified = False
1931
    for item in self._AllUUIDObjects():
1932
      if item.uuid is None:
1933
        item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1934
        modified = True
1935
    if not self._config_data.nodegroups:
1936
      default_nodegroup_name = constants.INITIAL_NODE_GROUP_NAME
1937
      default_nodegroup = objects.NodeGroup(name=default_nodegroup_name,
1938
                                            members=[])
1939
      self._UnlockedAddNodeGroup(default_nodegroup, _UPGRADE_CONFIG_JID, True)
1940
      modified = True
1941
    for node in self._config_data.nodes.values():
1942
      if not node.group:
1943
        node.group = self.LookupNodeGroup(None)
1944
        modified = True
1945
      # This is technically *not* an upgrade, but needs to be done both when
1946
      # nodegroups are being added, and upon normally loading the config,
1947
      # because the members list of a node group is discarded upon
1948
      # serializing/deserializing the object.
1949
      self._UnlockedAddNodeToGroup(node.name, node.group)
1950
    if modified:
1951
      self._WriteConfig()
1952
      # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1953
      # only called at config init time, without the lock held
1954
      self.DropECReservations(_UPGRADE_CONFIG_JID)
1955

    
1956
  def _DistributeConfig(self, feedback_fn):
1957
    """Distribute the configuration to the other nodes.
1958

1959
    Currently, this only copies the configuration file. In the future,
1960
    it could be used to encapsulate the 2/3-phase update mechanism.
1961

1962
    """
1963
    if self._offline:
1964
      return True
1965

    
1966
    bad = False
1967

    
1968
    node_list = []
1969
    addr_list = []
1970
    myhostname = self._my_hostname
1971
    # we can skip checking whether _UnlockedGetNodeInfo returns None
1972
    # since the node list comes from _UnlocketGetNodeList, and we are
1973
    # called with the lock held, so no modifications should take place
1974
    # in between
1975
    for node_name in self._UnlockedGetNodeList():
1976
      if node_name == myhostname:
1977
        continue
1978
      node_info = self._UnlockedGetNodeInfo(node_name)
1979
      if not node_info.master_candidate:
1980
        continue
1981
      node_list.append(node_info.name)
1982
      addr_list.append(node_info.primary_ip)
1983

    
1984
    # TODO: Use dedicated resolver talking to config writer for name resolution
1985
    result = \
1986
      self._GetRpc(addr_list).call_upload_file(node_list, self._cfg_file)
1987
    for to_node, to_result in result.items():
1988
      msg = to_result.fail_msg
1989
      if msg:
1990
        msg = ("Copy of file %s to node %s failed: %s" %
1991
               (self._cfg_file, to_node, msg))
1992
        logging.error(msg)
1993

    
1994
        if feedback_fn:
1995
          feedback_fn(msg)
1996

    
1997
        bad = True
1998

    
1999
    return not bad
2000

    
2001
  def _WriteConfig(self, destination=None, feedback_fn=None):
2002
    """Write the configuration data to persistent storage.
2003

2004
    """
2005
    assert feedback_fn is None or callable(feedback_fn)
2006

    
2007
    # Warn on config errors, but don't abort the save - the
2008
    # configuration has already been modified, and we can't revert;
2009
    # the best we can do is to warn the user and save as is, leaving
2010
    # recovery to the user
2011
    config_errors = self._UnlockedVerifyConfig()
2012
    if config_errors:
2013
      errmsg = ("Configuration data is not consistent: %s" %
2014
                (utils.CommaJoin(config_errors)))
2015
      logging.critical(errmsg)
2016
      if feedback_fn:
2017
        feedback_fn(errmsg)
2018

    
2019
    if destination is None:
2020
      destination = self._cfg_file
2021
    self._BumpSerialNo()
2022
    txt = serializer.Dump(self._config_data.ToDict())
2023

    
2024
    getents = self._getents()
2025
    try:
2026
      fd = utils.SafeWriteFile(destination, self._cfg_id, data=txt,
2027
                               close=False, gid=getents.confd_gid, mode=0640)
2028
    except errors.LockError:
2029
      raise errors.ConfigurationError("The configuration file has been"
2030
                                      " modified since the last write, cannot"
2031
                                      " update")
2032
    try:
2033
      self._cfg_id = utils.GetFileID(fd=fd)
2034
    finally:
2035
      os.close(fd)
2036

    
2037
    self.write_count += 1
2038

    
2039
    # and redistribute the config file to master candidates
2040
    self._DistributeConfig(feedback_fn)
2041

    
2042
    # Write ssconf files on all nodes (including locally)
2043
    if self._last_cluster_serial < self._config_data.cluster.serial_no:
2044
      if not self._offline:
2045
        result = self._GetRpc(None).call_write_ssconf_files(
2046
          self._UnlockedGetOnlineNodeList(),
2047
          self._UnlockedGetSsconfValues())
2048

    
2049
        for nname, nresu in result.items():
2050
          msg = nresu.fail_msg
2051
          if msg:
2052
            errmsg = ("Error while uploading ssconf files to"
2053
                      " node %s: %s" % (nname, msg))
2054
            logging.warning(errmsg)
2055

    
2056
            if feedback_fn:
2057
              feedback_fn(errmsg)
2058

    
2059
      self._last_cluster_serial = self._config_data.cluster.serial_no
2060

    
2061
  def _UnlockedGetSsconfValues(self):
2062
    """Return the values needed by ssconf.
2063

2064
    @rtype: dict
2065
    @return: a dictionary with keys the ssconf names and values their
2066
        associated value
2067

2068
    """
2069
    fn = "\n".join
2070
    instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
2071
    node_names = utils.NiceSort(self._UnlockedGetNodeList())
2072
    node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
2073
    node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
2074
                    for ninfo in node_info]
2075
    node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
2076
                    for ninfo in node_info]
2077

    
2078
    instance_data = fn(instance_names)
2079
    off_data = fn(node.name for node in node_info if node.offline)
2080
    on_data = fn(node.name for node in node_info if not node.offline)
2081
    mc_data = fn(node.name for node in node_info if node.master_candidate)
2082
    mc_ips_data = fn(node.primary_ip for node in node_info
2083
                     if node.master_candidate)
2084
    node_data = fn(node_names)
2085
    node_pri_ips_data = fn(node_pri_ips)
2086
    node_snd_ips_data = fn(node_snd_ips)
2087

    
2088
    cluster = self._config_data.cluster
2089
    cluster_tags = fn(cluster.GetTags())
2090

    
2091
    hypervisor_list = fn(cluster.enabled_hypervisors)
2092

    
2093
    uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n")
2094

    
2095
    nodegroups = ["%s %s" % (nodegroup.uuid, nodegroup.name) for nodegroup in
2096
                  self._config_data.nodegroups.values()]
2097
    nodegroups_data = fn(utils.NiceSort(nodegroups))
2098
    networks = ["%s %s" % (net.uuid, net.name) for net in
2099
                self._config_data.networks.values()]
2100
    networks_data = fn(utils.NiceSort(networks))
2101

    
2102
    ssconf_values = {
2103
      constants.SS_CLUSTER_NAME: cluster.cluster_name,
2104
      constants.SS_CLUSTER_TAGS: cluster_tags,
2105
      constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
2106
      constants.SS_SHARED_FILE_STORAGE_DIR: cluster.shared_file_storage_dir,
2107
      constants.SS_MASTER_CANDIDATES: mc_data,
2108
      constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
2109
      constants.SS_MASTER_IP: cluster.master_ip,
2110
      constants.SS_MASTER_NETDEV: cluster.master_netdev,
2111
      constants.SS_MASTER_NETMASK: str(cluster.master_netmask),
2112
      constants.SS_MASTER_NODE: cluster.master_node,
2113
      constants.SS_NODE_LIST: node_data,
2114
      constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
2115
      constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
2116
      constants.SS_OFFLINE_NODES: off_data,
2117
      constants.SS_ONLINE_NODES: on_data,
2118
      constants.SS_PRIMARY_IP_FAMILY: str(cluster.primary_ip_family),
2119
      constants.SS_INSTANCE_LIST: instance_data,
2120
      constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
2121
      constants.SS_HYPERVISOR_LIST: hypervisor_list,
2122
      constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
2123
      constants.SS_UID_POOL: uid_pool,
2124
      constants.SS_NODEGROUPS: nodegroups_data,
2125
      constants.SS_NETWORKS: networks_data,
2126
      }
2127
    bad_values = [(k, v) for k, v in ssconf_values.items()
2128
                  if not isinstance(v, (str, basestring))]
2129
    if bad_values:
2130
      err = utils.CommaJoin("%s=%s" % (k, v) for k, v in bad_values)
2131
      raise errors.ConfigurationError("Some ssconf key(s) have non-string"
2132
                                      " values: %s" % err)
2133
    return ssconf_values
2134

    
2135
  @locking.ssynchronized(_config_lock, shared=1)
2136
  def GetSsconfValues(self):
2137
    """Wrapper using lock around _UnlockedGetSsconf().
2138

2139
    """
2140
    return self._UnlockedGetSsconfValues()
2141

    
2142
  @locking.ssynchronized(_config_lock, shared=1)
2143
  def GetVGName(self):
2144
    """Return the volume group name.
2145

2146
    """
2147
    return self._config_data.cluster.volume_group_name
2148

    
2149
  @locking.ssynchronized(_config_lock)
2150
  def SetVGName(self, vg_name):
2151
    """Set the volume group name.
2152

2153
    """
2154
    self._config_data.cluster.volume_group_name = vg_name
2155
    self._config_data.cluster.serial_no += 1
2156
    self._WriteConfig()
2157

    
2158
  @locking.ssynchronized(_config_lock, shared=1)
2159
  def GetDRBDHelper(self):
2160
    """Return DRBD usermode helper.
2161

2162
    """
2163
    return self._config_data.cluster.drbd_usermode_helper
2164

    
2165
  @locking.ssynchronized(_config_lock)
2166
  def SetDRBDHelper(self, drbd_helper):
2167
    """Set DRBD usermode helper.
2168

2169
    """
2170
    self._config_data.cluster.drbd_usermode_helper = drbd_helper
2171
    self._config_data.cluster.serial_no += 1
2172
    self._WriteConfig()
2173

    
2174
  @locking.ssynchronized(_config_lock, shared=1)
2175
  def GetMACPrefix(self):
2176
    """Return the mac prefix.
2177

2178
    """
2179
    return self._config_data.cluster.mac_prefix
2180

    
2181
  @locking.ssynchronized(_config_lock, shared=1)
2182
  def GetClusterInfo(self):
2183
    """Returns information about the cluster
2184

2185
    @rtype: L{objects.Cluster}
2186
    @return: the cluster object
2187

2188
    """
2189
    return self._config_data.cluster
2190

    
2191
  @locking.ssynchronized(_config_lock, shared=1)
2192
  def HasAnyDiskOfType(self, dev_type):
2193
    """Check if in there is at disk of the given type in the configuration.
2194

2195
    """
2196
    return self._config_data.HasAnyDiskOfType(dev_type)
2197

    
2198
  @locking.ssynchronized(_config_lock)
2199
  def Update(self, target, feedback_fn):
2200
    """Notify function to be called after updates.
2201

2202
    This function must be called when an object (as returned by
2203
    GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
2204
    caller wants the modifications saved to the backing store. Note
2205
    that all modified objects will be saved, but the target argument
2206
    is the one the caller wants to ensure that it's saved.
2207

2208
    @param target: an instance of either L{objects.Cluster},
2209
        L{objects.Node} or L{objects.Instance} which is existing in
2210
        the cluster
2211
    @param feedback_fn: Callable feedback function
2212

2213
    """
2214
    if self._config_data is None:
2215
      raise errors.ProgrammerError("Configuration file not read,"
2216
                                   " cannot save.")
2217
    update_serial = False
2218
    if isinstance(target, objects.Cluster):
2219
      test = target == self._config_data.cluster
2220
    elif isinstance(target, objects.Node):
2221
      test = target in self._config_data.nodes.values()
2222
      update_serial = True
2223
    elif isinstance(target, objects.Instance):
2224
      test = target in self._config_data.instances.values()
2225
    elif isinstance(target, objects.NodeGroup):
2226
      test = target in self._config_data.nodegroups.values()
2227
    elif isinstance(target, objects.Network):
2228
      test = target in self._config_data.networks.values()
2229
    else:
2230
      raise errors.ProgrammerError("Invalid object type (%s) passed to"
2231
                                   " ConfigWriter.Update" % type(target))
2232
    if not test:
2233
      raise errors.ConfigurationError("Configuration updated since object"
2234
                                      " has been read or unknown object")
2235
    target.serial_no += 1
2236
    target.mtime = now = time.time()
2237

    
2238
    if update_serial:
2239
      # for node updates, we need to increase the cluster serial too
2240
      self._config_data.cluster.serial_no += 1
2241
      self._config_data.cluster.mtime = now
2242

    
2243
    if isinstance(target, objects.Instance):
2244
      self._UnlockedReleaseDRBDMinors(target.name)
2245

    
2246
    self._WriteConfig(feedback_fn=feedback_fn)
2247

    
2248
  @locking.ssynchronized(_config_lock)
2249
  def DropECReservations(self, ec_id):
2250
    """Drop per-execution-context reservations
2251

2252
    """
2253
    for rm in self._all_rms:
2254
      rm.DropECReservations(ec_id)
2255

    
2256
  @locking.ssynchronized(_config_lock, shared=1)
2257
  def GetAllNetworksInfo(self):
2258
    """Get the configuration of all networks
2259

2260
    """
2261
    return dict(self._config_data.networks)
2262

    
2263
  def _UnlockedGetNetworkList(self):
2264
    """Get the list of networks.
2265

2266
    This function is for internal use, when the config lock is already held.
2267

2268
    """
2269
    return self._config_data.networks.keys()
2270

    
2271
  @locking.ssynchronized(_config_lock, shared=1)
2272
  def GetNetworkList(self):
2273
    """Get the list of networks.
2274

2275
    @return: array of networks, ex. ["main", "vlan100", "200]
2276

2277
    """
2278
    return self._UnlockedGetNetworkList()
2279

    
2280
  @locking.ssynchronized(_config_lock, shared=1)
2281
  def GetNetworkNames(self):
2282
    """Get a list of network names
2283

2284
    """
2285
    names = [network.name
2286
             for network in self._config_data.networks.values()]
2287
    return names
2288

    
2289
  def _UnlockedGetNetwork(self, uuid):
2290
    """Returns information about a network.
2291

2292
    This function is for internal use, when the config lock is already held.
2293

2294
    """
2295
    if uuid not in self._config_data.networks:
2296
      return None
2297

    
2298
    return self._config_data.networks[uuid]
2299

    
2300
  @locking.ssynchronized(_config_lock, shared=1)
2301
  def GetNetwork(self, uuid):
2302
    """Returns information about a network.
2303

2304
    It takes the information from the configuration file.
2305

2306
    @param uuid: UUID of the network
2307

2308
    @rtype: L{objects.Network}
2309
    @return: the network object
2310

2311
    """
2312
    return self._UnlockedGetNetwork(uuid)
2313

    
2314
  @locking.ssynchronized(_config_lock)
2315
  def AddNetwork(self, net, ec_id, check_uuid=True):
2316
    """Add a network to the configuration.
2317

2318
    @type net: L{objects.Network}
2319
    @param net: the Network object to add
2320
    @type ec_id: string
2321
    @param ec_id: unique id for the job to use when creating a missing UUID
2322

2323
    """
2324
    self._UnlockedAddNetwork(net, ec_id, check_uuid)
2325
    self._WriteConfig()
2326

    
2327
  def _UnlockedAddNetwork(self, net, ec_id, check_uuid):
2328
    """Add a network to the configuration.
2329

2330
    """
2331
    logging.info("Adding network %s to configuration", net.name)
2332

    
2333
    if check_uuid:
2334
      self._EnsureUUID(net, ec_id)
2335

    
2336
    existing_uuid = self._UnlockedLookupNetwork(net.name)
2337
    if existing_uuid:
2338
      raise errors.OpPrereqError("Desired network name '%s' already"
2339
                                 " exists as a network (UUID: %s)" %
2340
                                 (net.name, existing_uuid),
2341
                                 errors.ECODE_EXISTS)
2342
    net.serial_no = 1
2343
    self._config_data.networks[net.uuid] = net
2344
    self._config_data.cluster.serial_no += 1
2345

    
2346
  def _UnlockedLookupNetwork(self, target):
2347
    """Lookup a network's UUID.
2348

2349
    @type target: string
2350
    @param target: network name or UUID
2351
    @rtype: string
2352
    @return: network UUID
2353
    @raises errors.OpPrereqError: when the target network cannot be found
2354

2355
    """
2356
    if target in self._config_data.networks:
2357
      return target
2358
    for net in self._config_data.networks.values():
2359
      if net.name == target:
2360
        return net.uuid
2361
    return None
2362

    
2363
  @locking.ssynchronized(_config_lock, shared=1)
2364
  def LookupNetwork(self, target):
2365
    """Lookup a network's UUID.
2366

2367
    This function is just a wrapper over L{_UnlockedLookupNetwork}.
2368

2369
    @type target: string
2370
    @param target: network name or UUID
2371
    @rtype: string
2372
    @return: network UUID
2373

2374
    """
2375
    return self._UnlockedLookupNetwork(target)
2376

    
2377
  @locking.ssynchronized(_config_lock)
2378
  def RemoveNetwork(self, network_uuid):
2379
    """Remove a network from the configuration.
2380

2381
    @type network_uuid: string
2382
    @param network_uuid: the UUID of the network to remove
2383

2384
    """
2385
    logging.info("Removing network %s from configuration", network_uuid)
2386

    
2387
    if network_uuid not in self._config_data.networks:
2388
      raise errors.ConfigurationError("Unknown network '%s'" % network_uuid)
2389

    
2390
    del self._config_data.networks[network_uuid]
2391
    self._config_data.cluster.serial_no += 1
2392
    self._WriteConfig()