Statistics
| Branch: | Tag: | Revision:

root / lib / config.py @ 8a147bba

History | View | Annotate | Download (73.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Configuration management for Ganeti
23

24
This module provides the interface to the Ganeti cluster configuration.
25

26
The configuration data is stored on every node but is updated on the master
27
only. After each update, the master distributes the data to the other nodes.
28

29
Currently, the data storage format is JSON. YAML was slow and consuming too
30
much memory.
31

32
"""
33

    
34
# pylint: disable=R0904
35
# R0904: Too many public methods
36

    
37
import os
38
import random
39
import logging
40
import time
41
import itertools
42

    
43
from ganeti import errors
44
from ganeti import locking
45
from ganeti import utils
46
from ganeti import constants
47
from ganeti import rpc
48
from ganeti import objects
49
from ganeti import serializer
50
from ganeti import uidpool
51
from ganeti import netutils
52
from ganeti import runtime
53

    
54

    
55
_config_lock = locking.SharedLock("ConfigWriter")
56

    
57
# job id used for resource management at config upgrade time
58
_UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
59

    
60

    
61
def _ValidateConfig(data):
62
  """Verifies that a configuration objects looks valid.
63

64
  This only verifies the version of the configuration.
65

66
  @raise errors.ConfigurationError: if the version differs from what
67
      we expect
68

69
  """
70
  if data.version != constants.CONFIG_VERSION:
71
    raise errors.ConfigVersionMismatch(constants.CONFIG_VERSION, data.version)
72

    
73

    
74
class TemporaryReservationManager:
75
  """A temporary resource reservation manager.
76

77
  This is used to reserve resources in a job, before using them, making sure
78
  other jobs cannot get them in the meantime.
79

80
  """
81
  def __init__(self):
82
    self._ec_reserved = {}
83

    
84
  def Reserved(self, resource):
85
    for holder_reserved in self._ec_reserved.values():
86
      if resource in holder_reserved:
87
        return True
88
    return False
89

    
90
  def Reserve(self, ec_id, resource):
91
    if self.Reserved(resource):
92
      raise errors.ReservationError("Duplicate reservation for resource '%s'"
93
                                    % str(resource))
94
    if ec_id not in self._ec_reserved:
95
      self._ec_reserved[ec_id] = set([resource])
96
    else:
97
      self._ec_reserved[ec_id].add(resource)
98

    
99
  def DropECReservations(self, ec_id):
100
    if ec_id in self._ec_reserved:
101
      del self._ec_reserved[ec_id]
102

    
103
  def GetReserved(self):
104
    all_reserved = set()
105
    for holder_reserved in self._ec_reserved.values():
106
      all_reserved.update(holder_reserved)
107
    return all_reserved
108

    
109
  def Generate(self, existing, generate_one_fn, ec_id):
110
    """Generate a new resource of this type
111

112
    """
113
    assert callable(generate_one_fn)
114

    
115
    all_elems = self.GetReserved()
116
    all_elems.update(existing)
117
    retries = 64
118
    while retries > 0:
119
      new_resource = generate_one_fn()
120
      if new_resource is not None and new_resource not in all_elems:
121
        break
122
    else:
123
      raise errors.ConfigurationError("Not able generate new resource"
124
                                      " (last tried: %s)" % new_resource)
125
    self.Reserve(ec_id, new_resource)
126
    return new_resource
127

    
128

    
129
def _MatchNameComponentIgnoreCase(short_name, names):
130
  """Wrapper around L{utils.text.MatchNameComponent}.
131

132
  """
133
  return utils.MatchNameComponent(short_name, names, case_sensitive=False)
134

    
135

    
136
def _CheckInstanceDiskIvNames(disks):
137
  """Checks if instance's disks' C{iv_name} attributes are in order.
138

139
  @type disks: list of L{objects.Disk}
140
  @param disks: List of disks
141
  @rtype: list of tuples; (int, string, string)
142
  @return: List of wrongly named disks, each tuple contains disk index,
143
    expected and actual name
144

145
  """
146
  result = []
147

    
148
  for (idx, disk) in enumerate(disks):
149
    exp_iv_name = "disk/%s" % idx
150
    if disk.iv_name != exp_iv_name:
151
      result.append((idx, exp_iv_name, disk.iv_name))
152

    
153
  return result
154

    
155

    
156
class ConfigWriter:
157
  """The interface to the cluster configuration.
158

159
  @ivar _temporary_lvs: reservation manager for temporary LVs
160
  @ivar _all_rms: a list of all temporary reservation managers
161

162
  """
163
  def __init__(self, cfg_file=None, offline=False, _getents=runtime.GetEnts,
164
               accept_foreign=False):
165
    self.write_count = 0
166
    self._lock = _config_lock
167
    self._config_data = None
168
    self._offline = offline
169
    if cfg_file is None:
170
      self._cfg_file = constants.CLUSTER_CONF_FILE
171
    else:
172
      self._cfg_file = cfg_file
173
    self._getents = _getents
174
    self._temporary_ids = TemporaryReservationManager()
175
    self._temporary_drbds = {}
176
    self._temporary_macs = TemporaryReservationManager()
177
    self._temporary_secrets = TemporaryReservationManager()
178
    self._temporary_lvs = TemporaryReservationManager()
179
    self._all_rms = [self._temporary_ids, self._temporary_macs,
180
                     self._temporary_secrets, self._temporary_lvs]
181
    # Note: in order to prevent errors when resolving our name in
182
    # _DistributeConfig, we compute it here once and reuse it; it's
183
    # better to raise an error before starting to modify the config
184
    # file than after it was modified
185
    self._my_hostname = netutils.Hostname.GetSysName()
186
    self._last_cluster_serial = -1
187
    self._cfg_id = None
188
    self._context = None
189
    self._OpenConfig(accept_foreign)
190

    
191
  def _GetRpc(self, address_list):
192
    """Returns RPC runner for configuration.
193

194
    """
195
    return rpc.ConfigRunner(self._context, address_list)
196

    
197
  def SetContext(self, context):
198
    """Sets Ganeti context.
199

200
    """
201
    self._context = context
202

    
203
  # this method needs to be static, so that we can call it on the class
204
  @staticmethod
205
  def IsCluster():
206
    """Check if the cluster is configured.
207

208
    """
209
    return os.path.exists(constants.CLUSTER_CONF_FILE)
210

    
211
  def _GenerateOneMAC(self):
212
    """Generate one mac address
213

214
    """
215
    prefix = self._config_data.cluster.mac_prefix
216
    byte1 = random.randrange(0, 256)
217
    byte2 = random.randrange(0, 256)
218
    byte3 = random.randrange(0, 256)
219
    mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
220
    return mac
221

    
222
  @locking.ssynchronized(_config_lock, shared=1)
223
  def GetNdParams(self, node):
224
    """Get the node params populated with cluster defaults.
225

226
    @type node: L{objects.Node}
227
    @param node: The node we want to know the params for
228
    @return: A dict with the filled in node params
229

230
    """
231
    nodegroup = self._UnlockedGetNodeGroup(node.group)
232
    return self._config_data.cluster.FillND(node, nodegroup)
233

    
234
  @locking.ssynchronized(_config_lock, shared=1)
235
  def GetInstanceDiskParams(self, instance):
236
    """Get the disk params populated with inherit chain.
237

238
    @type instance: L{objects.Instance}
239
    @param instance: The instance we want to know the params for
240
    @return: A dict with the filled in disk params
241

242
    """
243
    node = self._UnlockedGetNodeInfo(instance.primary_node)
244
    nodegroup = self._UnlockedGetNodeGroup(node.group)
245
    return self._config_data.cluster.SimpleFillDP(nodegroup.diskparams)
246

    
247
  @locking.ssynchronized(_config_lock, shared=1)
248
  def GenerateMAC(self, ec_id):
249
    """Generate a MAC for an instance.
250

251
    This should check the current instances for duplicates.
252

253
    """
254
    existing = self._AllMACs()
255
    return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
256

    
257
  @locking.ssynchronized(_config_lock, shared=1)
258
  def ReserveMAC(self, mac, ec_id):
259
    """Reserve a MAC for an instance.
260

261
    This only checks instances managed by this cluster, it does not
262
    check for potential collisions elsewhere.
263

264
    """
265
    all_macs = self._AllMACs()
266
    if mac in all_macs:
267
      raise errors.ReservationError("mac already in use")
268
    else:
269
      self._temporary_macs.Reserve(ec_id, mac)
270

    
271
  @locking.ssynchronized(_config_lock, shared=1)
272
  def ReserveLV(self, lv_name, ec_id):
273
    """Reserve an VG/LV pair for an instance.
274

275
    @type lv_name: string
276
    @param lv_name: the logical volume name to reserve
277

278
    """
279
    all_lvs = self._AllLVs()
280
    if lv_name in all_lvs:
281
      raise errors.ReservationError("LV already in use")
282
    else:
283
      self._temporary_lvs.Reserve(ec_id, lv_name)
284

    
285
  @locking.ssynchronized(_config_lock, shared=1)
286
  def GenerateDRBDSecret(self, ec_id):
287
    """Generate a DRBD secret.
288

289
    This checks the current disks for duplicates.
290

291
    """
292
    return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
293
                                            utils.GenerateSecret,
294
                                            ec_id)
295

    
296
  def _AllLVs(self):
297
    """Compute the list of all LVs.
298

299
    """
300
    lvnames = set()
301
    for instance in self._config_data.instances.values():
302
      node_data = instance.MapLVsByNode()
303
      for lv_list in node_data.values():
304
        lvnames.update(lv_list)
305
    return lvnames
306

    
307
  def _AllIDs(self, include_temporary):
308
    """Compute the list of all UUIDs and names we have.
309

310
    @type include_temporary: boolean
311
    @param include_temporary: whether to include the _temporary_ids set
312
    @rtype: set
313
    @return: a set of IDs
314

315
    """
316
    existing = set()
317
    if include_temporary:
318
      existing.update(self._temporary_ids.GetReserved())
319
    existing.update(self._AllLVs())
320
    existing.update(self._config_data.instances.keys())
321
    existing.update(self._config_data.nodes.keys())
322
    existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
323
    return existing
324

    
325
  def _GenerateUniqueID(self, ec_id):
326
    """Generate an unique UUID.
327

328
    This checks the current node, instances and disk names for
329
    duplicates.
330

331
    @rtype: string
332
    @return: the unique id
333

334
    """
335
    existing = self._AllIDs(include_temporary=False)
336
    return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
337

    
338
  @locking.ssynchronized(_config_lock, shared=1)
339
  def GenerateUniqueID(self, ec_id):
340
    """Generate an unique ID.
341

342
    This is just a wrapper over the unlocked version.
343

344
    @type ec_id: string
345
    @param ec_id: unique id for the job to reserve the id to
346

347
    """
348
    return self._GenerateUniqueID(ec_id)
349

    
350
  def _AllMACs(self):
351
    """Return all MACs present in the config.
352

353
    @rtype: list
354
    @return: the list of all MACs
355

356
    """
357
    result = []
358
    for instance in self._config_data.instances.values():
359
      for nic in instance.nics:
360
        result.append(nic.mac)
361

    
362
    return result
363

    
364
  def _AllDRBDSecrets(self):
365
    """Return all DRBD secrets present in the config.
366

367
    @rtype: list
368
    @return: the list of all DRBD secrets
369

370
    """
371
    def helper(disk, result):
372
      """Recursively gather secrets from this disk."""
373
      if disk.dev_type == constants.DT_DRBD8:
374
        result.append(disk.logical_id[5])
375
      if disk.children:
376
        for child in disk.children:
377
          helper(child, result)
378

    
379
    result = []
380
    for instance in self._config_data.instances.values():
381
      for disk in instance.disks:
382
        helper(disk, result)
383

    
384
    return result
385

    
386
  def _CheckDiskIDs(self, disk, l_ids, p_ids):
387
    """Compute duplicate disk IDs
388

389
    @type disk: L{objects.Disk}
390
    @param disk: the disk at which to start searching
391
    @type l_ids: list
392
    @param l_ids: list of current logical ids
393
    @type p_ids: list
394
    @param p_ids: list of current physical ids
395
    @rtype: list
396
    @return: a list of error messages
397

398
    """
399
    result = []
400
    if disk.logical_id is not None:
401
      if disk.logical_id in l_ids:
402
        result.append("duplicate logical id %s" % str(disk.logical_id))
403
      else:
404
        l_ids.append(disk.logical_id)
405
    if disk.physical_id is not None:
406
      if disk.physical_id in p_ids:
407
        result.append("duplicate physical id %s" % str(disk.physical_id))
408
      else:
409
        p_ids.append(disk.physical_id)
410

    
411
    if disk.children:
412
      for child in disk.children:
413
        result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
414
    return result
415

    
416
  def _UnlockedVerifyConfig(self):
417
    """Verify function.
418

419
    @rtype: list
420
    @return: a list of error messages; a non-empty list signifies
421
        configuration errors
422

423
    """
424
    # pylint: disable=R0914
425
    result = []
426
    seen_macs = []
427
    ports = {}
428
    data = self._config_data
429
    cluster = data.cluster
430
    seen_lids = []
431
    seen_pids = []
432

    
433
    # global cluster checks
434
    if not cluster.enabled_hypervisors:
435
      result.append("enabled hypervisors list doesn't have any entries")
436
    invalid_hvs = set(cluster.enabled_hypervisors) - constants.HYPER_TYPES
437
    if invalid_hvs:
438
      result.append("enabled hypervisors contains invalid entries: %s" %
439
                    invalid_hvs)
440
    missing_hvp = (set(cluster.enabled_hypervisors) -
441
                   set(cluster.hvparams.keys()))
442
    if missing_hvp:
443
      result.append("hypervisor parameters missing for the enabled"
444
                    " hypervisor(s) %s" % utils.CommaJoin(missing_hvp))
445

    
446
    if cluster.master_node not in data.nodes:
447
      result.append("cluster has invalid primary node '%s'" %
448
                    cluster.master_node)
449

    
450
    def _helper(owner, attr, value, template):
451
      try:
452
        utils.ForceDictType(value, template)
453
      except errors.GenericError, err:
454
        result.append("%s has invalid %s: %s" % (owner, attr, err))
455

    
456
    def _helper_nic(owner, params):
457
      try:
458
        objects.NIC.CheckParameterSyntax(params)
459
      except errors.ConfigurationError, err:
460
        result.append("%s has invalid nicparams: %s" % (owner, err))
461

    
462
    def _helper_ipolicy(owner, params):
463
      try:
464
        objects.InstancePolicy.CheckParameterSyntax(params)
465
      except errors.ConfigurationError, err:
466
        result.append("%s has invalid instance policy: %s" % (owner, err))
467

    
468
    def _helper_ispecs(owner, params):
469
      for key, value in params.items():
470
        if key in constants.IPOLICY_ISPECS:
471
          fullkey = "ipolicy/" + key
472
          _helper(owner, fullkey, value, constants.ISPECS_PARAMETER_TYPES)
473
        else:
474
          # FIXME: assuming list type
475
          if key in constants.IPOLICY_PARAMETERS:
476
            exp_type = float
477
          else:
478
            exp_type = list
479
          if not isinstance(value, exp_type):
480
            result.append("%s has invalid instance policy: for %s,"
481
                          " expecting %s, got %s" %
482
                          (owner, key, exp_type.__name__, type(value)))
483

    
484
    # check cluster parameters
485
    _helper("cluster", "beparams", cluster.SimpleFillBE({}),
486
            constants.BES_PARAMETER_TYPES)
487
    _helper("cluster", "nicparams", cluster.SimpleFillNIC({}),
488
            constants.NICS_PARAMETER_TYPES)
489
    _helper_nic("cluster", cluster.SimpleFillNIC({}))
490
    _helper("cluster", "ndparams", cluster.SimpleFillND({}),
491
            constants.NDS_PARAMETER_TYPES)
492
    _helper_ipolicy("cluster", cluster.SimpleFillIPolicy({}))
493
    _helper_ispecs("cluster", cluster.SimpleFillIPolicy({}))
494

    
495
    # per-instance checks
496
    for instance_name in data.instances:
497
      instance = data.instances[instance_name]
498
      if instance.name != instance_name:
499
        result.append("instance '%s' is indexed by wrong name '%s'" %
500
                      (instance.name, instance_name))
501
      if instance.primary_node not in data.nodes:
502
        result.append("instance '%s' has invalid primary node '%s'" %
503
                      (instance_name, instance.primary_node))
504
      for snode in instance.secondary_nodes:
505
        if snode not in data.nodes:
506
          result.append("instance '%s' has invalid secondary node '%s'" %
507
                        (instance_name, snode))
508
      for idx, nic in enumerate(instance.nics):
509
        if nic.mac in seen_macs:
510
          result.append("instance '%s' has NIC %d mac %s duplicate" %
511
                        (instance_name, idx, nic.mac))
512
        else:
513
          seen_macs.append(nic.mac)
514
        if nic.nicparams:
515
          filled = cluster.SimpleFillNIC(nic.nicparams)
516
          owner = "instance %s nic %d" % (instance.name, idx)
517
          _helper(owner, "nicparams",
518
                  filled, constants.NICS_PARAMETER_TYPES)
519
          _helper_nic(owner, filled)
520

    
521
      # parameter checks
522
      if instance.beparams:
523
        _helper("instance %s" % instance.name, "beparams",
524
                cluster.FillBE(instance), constants.BES_PARAMETER_TYPES)
525

    
526
      # gather the drbd ports for duplicate checks
527
      for (idx, dsk) in enumerate(instance.disks):
528
        if dsk.dev_type in constants.LDS_DRBD:
529
          tcp_port = dsk.logical_id[2]
530
          if tcp_port not in ports:
531
            ports[tcp_port] = []
532
          ports[tcp_port].append((instance.name, "drbd disk %s" % idx))
533
      # gather network port reservation
534
      net_port = getattr(instance, "network_port", None)
535
      if net_port is not None:
536
        if net_port not in ports:
537
          ports[net_port] = []
538
        ports[net_port].append((instance.name, "network port"))
539

    
540
      # instance disk verify
541
      for idx, disk in enumerate(instance.disks):
542
        result.extend(["instance '%s' disk %d error: %s" %
543
                       (instance.name, idx, msg) for msg in disk.Verify()])
544
        result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
545

    
546
      wrong_names = _CheckInstanceDiskIvNames(instance.disks)
547
      if wrong_names:
548
        tmp = "; ".join(("name of disk %s should be '%s', but is '%s'" %
549
                         (idx, exp_name, actual_name))
550
                        for (idx, exp_name, actual_name) in wrong_names)
551

    
552
        result.append("Instance '%s' has wrongly named disks: %s" %
553
                      (instance.name, tmp))
554

    
555
    # cluster-wide pool of free ports
556
    for free_port in cluster.tcpudp_port_pool:
557
      if free_port not in ports:
558
        ports[free_port] = []
559
      ports[free_port].append(("cluster", "port marked as free"))
560

    
561
    # compute tcp/udp duplicate ports
562
    keys = ports.keys()
563
    keys.sort()
564
    for pnum in keys:
565
      pdata = ports[pnum]
566
      if len(pdata) > 1:
567
        txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
568
        result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
569

    
570
    # highest used tcp port check
571
    if keys:
572
      if keys[-1] > cluster.highest_used_port:
573
        result.append("Highest used port mismatch, saved %s, computed %s" %
574
                      (cluster.highest_used_port, keys[-1]))
575

    
576
    if not data.nodes[cluster.master_node].master_candidate:
577
      result.append("Master node is not a master candidate")
578

    
579
    # master candidate checks
580
    mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
581
    if mc_now < mc_max:
582
      result.append("Not enough master candidates: actual %d, target %d" %
583
                    (mc_now, mc_max))
584

    
585
    # node checks
586
    for node_name, node in data.nodes.items():
587
      if node.name != node_name:
588
        result.append("Node '%s' is indexed by wrong name '%s'" %
589
                      (node.name, node_name))
590
      if [node.master_candidate, node.drained, node.offline].count(True) > 1:
591
        result.append("Node %s state is invalid: master_candidate=%s,"
592
                      " drain=%s, offline=%s" %
593
                      (node.name, node.master_candidate, node.drained,
594
                       node.offline))
595
      if node.group not in data.nodegroups:
596
        result.append("Node '%s' has invalid group '%s'" %
597
                      (node.name, node.group))
598
      else:
599
        _helper("node %s" % node.name, "ndparams",
600
                cluster.FillND(node, data.nodegroups[node.group]),
601
                constants.NDS_PARAMETER_TYPES)
602

    
603
    # nodegroups checks
604
    nodegroups_names = set()
605
    for nodegroup_uuid in data.nodegroups:
606
      nodegroup = data.nodegroups[nodegroup_uuid]
607
      if nodegroup.uuid != nodegroup_uuid:
608
        result.append("node group '%s' (uuid: '%s') indexed by wrong uuid '%s'"
609
                      % (nodegroup.name, nodegroup.uuid, nodegroup_uuid))
610
      if utils.UUID_RE.match(nodegroup.name.lower()):
611
        result.append("node group '%s' (uuid: '%s') has uuid-like name" %
612
                      (nodegroup.name, nodegroup.uuid))
613
      if nodegroup.name in nodegroups_names:
614
        result.append("duplicate node group name '%s'" % nodegroup.name)
615
      else:
616
        nodegroups_names.add(nodegroup.name)
617
      group_name = "group %s" % nodegroup.name
618
      _helper_ipolicy(group_name, cluster.SimpleFillIPolicy(nodegroup.ipolicy))
619
      _helper_ispecs(group_name, cluster.SimpleFillIPolicy(nodegroup.ipolicy))
620
      if nodegroup.ndparams:
621
        _helper(group_name, "ndparams",
622
                cluster.SimpleFillND(nodegroup.ndparams),
623
                constants.NDS_PARAMETER_TYPES)
624

    
625
    # drbd minors check
626
    _, duplicates = self._UnlockedComputeDRBDMap()
627
    for node, minor, instance_a, instance_b in duplicates:
628
      result.append("DRBD minor %d on node %s is assigned twice to instances"
629
                    " %s and %s" % (minor, node, instance_a, instance_b))
630

    
631
    # IP checks
632
    default_nicparams = cluster.nicparams[constants.PP_DEFAULT]
633
    ips = {}
634

    
635
    def _AddIpAddress(ip, name):
636
      ips.setdefault(ip, []).append(name)
637

    
638
    _AddIpAddress(cluster.master_ip, "cluster_ip")
639

    
640
    for node in data.nodes.values():
641
      _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
642
      if node.secondary_ip != node.primary_ip:
643
        _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
644

    
645
    for instance in data.instances.values():
646
      for idx, nic in enumerate(instance.nics):
647
        if nic.ip is None:
648
          continue
649

    
650
        nicparams = objects.FillDict(default_nicparams, nic.nicparams)
651
        nic_mode = nicparams[constants.NIC_MODE]
652
        nic_link = nicparams[constants.NIC_LINK]
653

    
654
        if nic_mode == constants.NIC_MODE_BRIDGED:
655
          link = "bridge:%s" % nic_link
656
        elif nic_mode == constants.NIC_MODE_ROUTED:
657
          link = "route:%s" % nic_link
658
        else:
659
          raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
660

    
661
        _AddIpAddress("%s/%s" % (link, nic.ip),
662
                      "instance:%s/nic:%d" % (instance.name, idx))
663

    
664
    for ip, owners in ips.items():
665
      if len(owners) > 1:
666
        result.append("IP address %s is used by multiple owners: %s" %
667
                      (ip, utils.CommaJoin(owners)))
668

    
669
    return result
670

    
671
  @locking.ssynchronized(_config_lock, shared=1)
672
  def VerifyConfig(self):
673
    """Verify function.
674

675
    This is just a wrapper over L{_UnlockedVerifyConfig}.
676

677
    @rtype: list
678
    @return: a list of error messages; a non-empty list signifies
679
        configuration errors
680

681
    """
682
    return self._UnlockedVerifyConfig()
683

    
684
  def _UnlockedSetDiskID(self, disk, node_name):
685
    """Convert the unique ID to the ID needed on the target nodes.
686

687
    This is used only for drbd, which needs ip/port configuration.
688

689
    The routine descends down and updates its children also, because
690
    this helps when the only the top device is passed to the remote
691
    node.
692

693
    This function is for internal use, when the config lock is already held.
694

695
    """
696
    if disk.children:
697
      for child in disk.children:
698
        self._UnlockedSetDiskID(child, node_name)
699

    
700
    if disk.logical_id is None and disk.physical_id is not None:
701
      return
702
    if disk.dev_type == constants.LD_DRBD8:
703
      pnode, snode, port, pminor, sminor, secret = disk.logical_id
704
      if node_name not in (pnode, snode):
705
        raise errors.ConfigurationError("DRBD device not knowing node %s" %
706
                                        node_name)
707
      pnode_info = self._UnlockedGetNodeInfo(pnode)
708
      snode_info = self._UnlockedGetNodeInfo(snode)
709
      if pnode_info is None or snode_info is None:
710
        raise errors.ConfigurationError("Can't find primary or secondary node"
711
                                        " for %s" % str(disk))
712
      p_data = (pnode_info.secondary_ip, port)
713
      s_data = (snode_info.secondary_ip, port)
714
      if pnode == node_name:
715
        disk.physical_id = p_data + s_data + (pminor, secret)
716
      else: # it must be secondary, we tested above
717
        disk.physical_id = s_data + p_data + (sminor, secret)
718
    else:
719
      disk.physical_id = disk.logical_id
720
    return
721

    
722
  @locking.ssynchronized(_config_lock)
723
  def SetDiskID(self, disk, node_name):
724
    """Convert the unique ID to the ID needed on the target nodes.
725

726
    This is used only for drbd, which needs ip/port configuration.
727

728
    The routine descends down and updates its children also, because
729
    this helps when the only the top device is passed to the remote
730
    node.
731

732
    """
733
    return self._UnlockedSetDiskID(disk, node_name)
734

    
735
  @locking.ssynchronized(_config_lock)
736
  def AddTcpUdpPort(self, port):
737
    """Adds a new port to the available port pool.
738

739
    @warning: this method does not "flush" the configuration (via
740
        L{_WriteConfig}); callers should do that themselves once the
741
        configuration is stable
742

743
    """
744
    if not isinstance(port, int):
745
      raise errors.ProgrammerError("Invalid type passed for port")
746

    
747
    self._config_data.cluster.tcpudp_port_pool.add(port)
748

    
749
  @locking.ssynchronized(_config_lock, shared=1)
750
  def GetPortList(self):
751
    """Returns a copy of the current port list.
752

753
    """
754
    return self._config_data.cluster.tcpudp_port_pool.copy()
755

    
756
  @locking.ssynchronized(_config_lock)
757
  def AllocatePort(self):
758
    """Allocate a port.
759

760
    The port will be taken from the available port pool or from the
761
    default port range (and in this case we increase
762
    highest_used_port).
763

764
    """
765
    # If there are TCP/IP ports configured, we use them first.
766
    if self._config_data.cluster.tcpudp_port_pool:
767
      port = self._config_data.cluster.tcpudp_port_pool.pop()
768
    else:
769
      port = self._config_data.cluster.highest_used_port + 1
770
      if port >= constants.LAST_DRBD_PORT:
771
        raise errors.ConfigurationError("The highest used port is greater"
772
                                        " than %s. Aborting." %
773
                                        constants.LAST_DRBD_PORT)
774
      self._config_data.cluster.highest_used_port = port
775

    
776
    self._WriteConfig()
777
    return port
778

    
779
  def _UnlockedComputeDRBDMap(self):
780
    """Compute the used DRBD minor/nodes.
781

782
    @rtype: (dict, list)
783
    @return: dictionary of node_name: dict of minor: instance_name;
784
        the returned dict will have all the nodes in it (even if with
785
        an empty list), and a list of duplicates; if the duplicates
786
        list is not empty, the configuration is corrupted and its caller
787
        should raise an exception
788

789
    """
790
    def _AppendUsedPorts(instance_name, disk, used):
791
      duplicates = []
792
      if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
793
        node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
794
        for node, port in ((node_a, minor_a), (node_b, minor_b)):
795
          assert node in used, ("Node '%s' of instance '%s' not found"
796
                                " in node list" % (node, instance_name))
797
          if port in used[node]:
798
            duplicates.append((node, port, instance_name, used[node][port]))
799
          else:
800
            used[node][port] = instance_name
801
      if disk.children:
802
        for child in disk.children:
803
          duplicates.extend(_AppendUsedPorts(instance_name, child, used))
804
      return duplicates
805

    
806
    duplicates = []
807
    my_dict = dict((node, {}) for node in self._config_data.nodes)
808
    for instance in self._config_data.instances.itervalues():
809
      for disk in instance.disks:
810
        duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
811
    for (node, minor), instance in self._temporary_drbds.iteritems():
812
      if minor in my_dict[node] and my_dict[node][minor] != instance:
813
        duplicates.append((node, minor, instance, my_dict[node][minor]))
814
      else:
815
        my_dict[node][minor] = instance
816
    return my_dict, duplicates
817

    
818
  @locking.ssynchronized(_config_lock)
819
  def ComputeDRBDMap(self):
820
    """Compute the used DRBD minor/nodes.
821

822
    This is just a wrapper over L{_UnlockedComputeDRBDMap}.
823

824
    @return: dictionary of node_name: dict of minor: instance_name;
825
        the returned dict will have all the nodes in it (even if with
826
        an empty list).
827

828
    """
829
    d_map, duplicates = self._UnlockedComputeDRBDMap()
830
    if duplicates:
831
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
832
                                      str(duplicates))
833
    return d_map
834

    
835
  @locking.ssynchronized(_config_lock)
836
  def AllocateDRBDMinor(self, nodes, instance):
837
    """Allocate a drbd minor.
838

839
    The free minor will be automatically computed from the existing
840
    devices. A node can be given multiple times in order to allocate
841
    multiple minors. The result is the list of minors, in the same
842
    order as the passed nodes.
843

844
    @type instance: string
845
    @param instance: the instance for which we allocate minors
846

847
    """
848
    assert isinstance(instance, basestring), \
849
           "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
850

    
851
    d_map, duplicates = self._UnlockedComputeDRBDMap()
852
    if duplicates:
853
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
854
                                      str(duplicates))
855
    result = []
856
    for nname in nodes:
857
      ndata = d_map[nname]
858
      if not ndata:
859
        # no minors used, we can start at 0
860
        result.append(0)
861
        ndata[0] = instance
862
        self._temporary_drbds[(nname, 0)] = instance
863
        continue
864
      keys = ndata.keys()
865
      keys.sort()
866
      ffree = utils.FirstFree(keys)
867
      if ffree is None:
868
        # return the next minor
869
        # TODO: implement high-limit check
870
        minor = keys[-1] + 1
871
      else:
872
        minor = ffree
873
      # double-check minor against current instances
874
      assert minor not in d_map[nname], \
875
             ("Attempt to reuse allocated DRBD minor %d on node %s,"
876
              " already allocated to instance %s" %
877
              (minor, nname, d_map[nname][minor]))
878
      ndata[minor] = instance
879
      # double-check minor against reservation
880
      r_key = (nname, minor)
881
      assert r_key not in self._temporary_drbds, \
882
             ("Attempt to reuse reserved DRBD minor %d on node %s,"
883
              " reserved for instance %s" %
884
              (minor, nname, self._temporary_drbds[r_key]))
885
      self._temporary_drbds[r_key] = instance
886
      result.append(minor)
887
    logging.debug("Request to allocate drbd minors, input: %s, returning %s",
888
                  nodes, result)
889
    return result
890

    
891
  def _UnlockedReleaseDRBDMinors(self, instance):
892
    """Release temporary drbd minors allocated for a given instance.
893

894
    @type instance: string
895
    @param instance: the instance for which temporary minors should be
896
                     released
897

898
    """
899
    assert isinstance(instance, basestring), \
900
           "Invalid argument passed to ReleaseDRBDMinors"
901
    for key, name in self._temporary_drbds.items():
902
      if name == instance:
903
        del self._temporary_drbds[key]
904

    
905
  @locking.ssynchronized(_config_lock)
906
  def ReleaseDRBDMinors(self, instance):
907
    """Release temporary drbd minors allocated for a given instance.
908

909
    This should be called on the error paths, on the success paths
910
    it's automatically called by the ConfigWriter add and update
911
    functions.
912

913
    This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
914

915
    @type instance: string
916
    @param instance: the instance for which temporary minors should be
917
                     released
918

919
    """
920
    self._UnlockedReleaseDRBDMinors(instance)
921

    
922
  @locking.ssynchronized(_config_lock, shared=1)
923
  def GetConfigVersion(self):
924
    """Get the configuration version.
925

926
    @return: Config version
927

928
    """
929
    return self._config_data.version
930

    
931
  @locking.ssynchronized(_config_lock, shared=1)
932
  def GetClusterName(self):
933
    """Get cluster name.
934

935
    @return: Cluster name
936

937
    """
938
    return self._config_data.cluster.cluster_name
939

    
940
  @locking.ssynchronized(_config_lock, shared=1)
941
  def GetMasterNode(self):
942
    """Get the hostname of the master node for this cluster.
943

944
    @return: Master hostname
945

946
    """
947
    return self._config_data.cluster.master_node
948

    
949
  @locking.ssynchronized(_config_lock, shared=1)
950
  def GetMasterIP(self):
951
    """Get the IP of the master node for this cluster.
952

953
    @return: Master IP
954

955
    """
956
    return self._config_data.cluster.master_ip
957

    
958
  @locking.ssynchronized(_config_lock, shared=1)
959
  def GetMasterNetdev(self):
960
    """Get the master network device for this cluster.
961

962
    """
963
    return self._config_data.cluster.master_netdev
964

    
965
  @locking.ssynchronized(_config_lock, shared=1)
966
  def GetMasterNetmask(self):
967
    """Get the netmask of the master node for this cluster.
968

969
    """
970
    return self._config_data.cluster.master_netmask
971

    
972
  @locking.ssynchronized(_config_lock, shared=1)
973
  def GetUseExternalMipScript(self):
974
    """Get flag representing whether to use the external master IP setup script.
975

976
    """
977
    return self._config_data.cluster.use_external_mip_script
978

    
979
  @locking.ssynchronized(_config_lock, shared=1)
980
  def GetFileStorageDir(self):
981
    """Get the file storage dir for this cluster.
982

983
    """
984
    return self._config_data.cluster.file_storage_dir
985

    
986
  @locking.ssynchronized(_config_lock, shared=1)
987
  def GetSharedFileStorageDir(self):
988
    """Get the shared file storage dir for this cluster.
989

990
    """
991
    return self._config_data.cluster.shared_file_storage_dir
992

    
993
  @locking.ssynchronized(_config_lock, shared=1)
994
  def GetHypervisorType(self):
995
    """Get the hypervisor type for this cluster.
996

997
    """
998
    return self._config_data.cluster.enabled_hypervisors[0]
999

    
1000
  @locking.ssynchronized(_config_lock, shared=1)
1001
  def GetHostKey(self):
1002
    """Return the rsa hostkey from the config.
1003

1004
    @rtype: string
1005
    @return: the rsa hostkey
1006

1007
    """
1008
    return self._config_data.cluster.rsahostkeypub
1009

    
1010
  @locking.ssynchronized(_config_lock, shared=1)
1011
  def GetDefaultIAllocator(self):
1012
    """Get the default instance allocator for this cluster.
1013

1014
    """
1015
    return self._config_data.cluster.default_iallocator
1016

    
1017
  @locking.ssynchronized(_config_lock, shared=1)
1018
  def GetPrimaryIPFamily(self):
1019
    """Get cluster primary ip family.
1020

1021
    @return: primary ip family
1022

1023
    """
1024
    return self._config_data.cluster.primary_ip_family
1025

    
1026
  @locking.ssynchronized(_config_lock, shared=1)
1027
  def GetMasterNetworkParameters(self):
1028
    """Get network parameters of the master node.
1029

1030
    @rtype: L{object.MasterNetworkParameters}
1031
    @return: network parameters of the master node
1032

1033
    """
1034
    cluster = self._config_data.cluster
1035
    result = objects.MasterNetworkParameters(name=cluster.master_node,
1036
      ip=cluster.master_ip,
1037
      netmask=cluster.master_netmask,
1038
      netdev=cluster.master_netdev,
1039
      ip_family=cluster.primary_ip_family)
1040

    
1041
    return result
1042

    
1043
  @locking.ssynchronized(_config_lock)
1044
  def AddNodeGroup(self, group, ec_id, check_uuid=True):
1045
    """Add a node group to the configuration.
1046

1047
    This method calls group.UpgradeConfig() to fill any missing attributes
1048
    according to their default values.
1049

1050
    @type group: L{objects.NodeGroup}
1051
    @param group: the NodeGroup object to add
1052
    @type ec_id: string
1053
    @param ec_id: unique id for the job to use when creating a missing UUID
1054
    @type check_uuid: bool
1055
    @param check_uuid: add an UUID to the group if it doesn't have one or, if
1056
                       it does, ensure that it does not exist in the
1057
                       configuration already
1058

1059
    """
1060
    self._UnlockedAddNodeGroup(group, ec_id, check_uuid)
1061
    self._WriteConfig()
1062

    
1063
  def _UnlockedAddNodeGroup(self, group, ec_id, check_uuid):
1064
    """Add a node group to the configuration.
1065

1066
    """
1067
    logging.info("Adding node group %s to configuration", group.name)
1068

    
1069
    # Some code might need to add a node group with a pre-populated UUID
1070
    # generated with ConfigWriter.GenerateUniqueID(). We allow them to bypass
1071
    # the "does this UUID" exist already check.
1072
    if check_uuid:
1073
      self._EnsureUUID(group, ec_id)
1074

    
1075
    try:
1076
      existing_uuid = self._UnlockedLookupNodeGroup(group.name)
1077
    except errors.OpPrereqError:
1078
      pass
1079
    else:
1080
      raise errors.OpPrereqError("Desired group name '%s' already exists as a"
1081
                                 " node group (UUID: %s)" %
1082
                                 (group.name, existing_uuid),
1083
                                 errors.ECODE_EXISTS)
1084

    
1085
    group.serial_no = 1
1086
    group.ctime = group.mtime = time.time()
1087
    group.UpgradeConfig()
1088

    
1089
    self._config_data.nodegroups[group.uuid] = group
1090
    self._config_data.cluster.serial_no += 1
1091

    
1092
  @locking.ssynchronized(_config_lock)
1093
  def RemoveNodeGroup(self, group_uuid):
1094
    """Remove a node group from the configuration.
1095

1096
    @type group_uuid: string
1097
    @param group_uuid: the UUID of the node group to remove
1098

1099
    """
1100
    logging.info("Removing node group %s from configuration", group_uuid)
1101

    
1102
    if group_uuid not in self._config_data.nodegroups:
1103
      raise errors.ConfigurationError("Unknown node group '%s'" % group_uuid)
1104

    
1105
    assert len(self._config_data.nodegroups) != 1, \
1106
            "Group '%s' is the only group, cannot be removed" % group_uuid
1107

    
1108
    del self._config_data.nodegroups[group_uuid]
1109
    self._config_data.cluster.serial_no += 1
1110
    self._WriteConfig()
1111

    
1112
  def _UnlockedLookupNodeGroup(self, target):
1113
    """Lookup a node group's UUID.
1114

1115
    @type target: string or None
1116
    @param target: group name or UUID or None to look for the default
1117
    @rtype: string
1118
    @return: nodegroup UUID
1119
    @raises errors.OpPrereqError: when the target group cannot be found
1120

1121
    """
1122
    if target is None:
1123
      if len(self._config_data.nodegroups) != 1:
1124
        raise errors.OpPrereqError("More than one node group exists. Target"
1125
                                   " group must be specified explicitely.")
1126
      else:
1127
        return self._config_data.nodegroups.keys()[0]
1128
    if target in self._config_data.nodegroups:
1129
      return target
1130
    for nodegroup in self._config_data.nodegroups.values():
1131
      if nodegroup.name == target:
1132
        return nodegroup.uuid
1133
    raise errors.OpPrereqError("Node group '%s' not found" % target,
1134
                               errors.ECODE_NOENT)
1135

    
1136
  @locking.ssynchronized(_config_lock, shared=1)
1137
  def LookupNodeGroup(self, target):
1138
    """Lookup a node group's UUID.
1139

1140
    This function is just a wrapper over L{_UnlockedLookupNodeGroup}.
1141

1142
    @type target: string or None
1143
    @param target: group name or UUID or None to look for the default
1144
    @rtype: string
1145
    @return: nodegroup UUID
1146

1147
    """
1148
    return self._UnlockedLookupNodeGroup(target)
1149

    
1150
  def _UnlockedGetNodeGroup(self, uuid):
1151
    """Lookup a node group.
1152

1153
    @type uuid: string
1154
    @param uuid: group UUID
1155
    @rtype: L{objects.NodeGroup} or None
1156
    @return: nodegroup object, or None if not found
1157

1158
    """
1159
    if uuid not in self._config_data.nodegroups:
1160
      return None
1161

    
1162
    return self._config_data.nodegroups[uuid]
1163

    
1164
  @locking.ssynchronized(_config_lock, shared=1)
1165
  def GetNodeGroup(self, uuid):
1166
    """Lookup a node group.
1167

1168
    @type uuid: string
1169
    @param uuid: group UUID
1170
    @rtype: L{objects.NodeGroup} or None
1171
    @return: nodegroup object, or None if not found
1172

1173
    """
1174
    return self._UnlockedGetNodeGroup(uuid)
1175

    
1176
  @locking.ssynchronized(_config_lock, shared=1)
1177
  def GetAllNodeGroupsInfo(self):
1178
    """Get the configuration of all node groups.
1179

1180
    """
1181
    return dict(self._config_data.nodegroups)
1182

    
1183
  @locking.ssynchronized(_config_lock, shared=1)
1184
  def GetNodeGroupList(self):
1185
    """Get a list of node groups.
1186

1187
    """
1188
    return self._config_data.nodegroups.keys()
1189

    
1190
  @locking.ssynchronized(_config_lock, shared=1)
1191
  def GetNodeGroupMembersByNodes(self, nodes):
1192
    """Get nodes which are member in the same nodegroups as the given nodes.
1193

1194
    """
1195
    ngfn = lambda node_name: self._UnlockedGetNodeInfo(node_name).group
1196
    return frozenset(member_name
1197
                     for node_name in nodes
1198
                     for member_name in
1199
                       self._UnlockedGetNodeGroup(ngfn(node_name)).members)
1200

    
1201
  @locking.ssynchronized(_config_lock, shared=1)
1202
  def GetMultiNodeGroupInfo(self, group_uuids):
1203
    """Get the configuration of multiple node groups.
1204

1205
    @param group_uuids: List of node group UUIDs
1206
    @rtype: list
1207
    @return: List of tuples of (group_uuid, group_info)
1208

1209
    """
1210
    return [(uuid, self._UnlockedGetNodeGroup(uuid)) for uuid in group_uuids]
1211

    
1212
  @locking.ssynchronized(_config_lock)
1213
  def AddInstance(self, instance, ec_id):
1214
    """Add an instance to the config.
1215

1216
    This should be used after creating a new instance.
1217

1218
    @type instance: L{objects.Instance}
1219
    @param instance: the instance object
1220

1221
    """
1222
    if not isinstance(instance, objects.Instance):
1223
      raise errors.ProgrammerError("Invalid type passed to AddInstance")
1224

    
1225
    if instance.disk_template != constants.DT_DISKLESS:
1226
      all_lvs = instance.MapLVsByNode()
1227
      logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
1228

    
1229
    all_macs = self._AllMACs()
1230
    for nic in instance.nics:
1231
      if nic.mac in all_macs:
1232
        raise errors.ConfigurationError("Cannot add instance %s:"
1233
                                        " MAC address '%s' already in use." %
1234
                                        (instance.name, nic.mac))
1235

    
1236
    self._EnsureUUID(instance, ec_id)
1237

    
1238
    instance.serial_no = 1
1239
    instance.ctime = instance.mtime = time.time()
1240
    self._config_data.instances[instance.name] = instance
1241
    self._config_data.cluster.serial_no += 1
1242
    self._UnlockedReleaseDRBDMinors(instance.name)
1243
    self._WriteConfig()
1244

    
1245
  def _EnsureUUID(self, item, ec_id):
1246
    """Ensures a given object has a valid UUID.
1247

1248
    @param item: the instance or node to be checked
1249
    @param ec_id: the execution context id for the uuid reservation
1250

1251
    """
1252
    if not item.uuid:
1253
      item.uuid = self._GenerateUniqueID(ec_id)
1254
    elif item.uuid in self._AllIDs(include_temporary=True):
1255
      raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
1256
                                      " in use" % (item.name, item.uuid))
1257

    
1258
  def _SetInstanceStatus(self, instance_name, status):
1259
    """Set the instance's status to a given value.
1260

1261
    """
1262
    assert status in constants.ADMINST_ALL, \
1263
           "Invalid status '%s' passed to SetInstanceStatus" % (status,)
1264

    
1265
    if instance_name not in self._config_data.instances:
1266
      raise errors.ConfigurationError("Unknown instance '%s'" %
1267
                                      instance_name)
1268
    instance = self._config_data.instances[instance_name]
1269
    if instance.admin_state != status:
1270
      instance.admin_state = status
1271
      instance.serial_no += 1
1272
      instance.mtime = time.time()
1273
      self._WriteConfig()
1274

    
1275
  @locking.ssynchronized(_config_lock)
1276
  def MarkInstanceUp(self, instance_name):
1277
    """Mark the instance status to up in the config.
1278

1279
    """
1280
    self._SetInstanceStatus(instance_name, constants.ADMINST_UP)
1281

    
1282
  @locking.ssynchronized(_config_lock)
1283
  def MarkInstanceOffline(self, instance_name):
1284
    """Mark the instance status to down in the config.
1285

1286
    """
1287
    self._SetInstanceStatus(instance_name, constants.ADMINST_OFFLINE)
1288

    
1289
  @locking.ssynchronized(_config_lock)
1290
  def RemoveInstance(self, instance_name):
1291
    """Remove the instance from the configuration.
1292

1293
    """
1294
    if instance_name not in self._config_data.instances:
1295
      raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
1296

    
1297
    # If a network port has been allocated to the instance,
1298
    # return it to the pool of free ports.
1299
    inst = self._config_data.instances[instance_name]
1300
    network_port = getattr(inst, "network_port", None)
1301
    if network_port is not None:
1302
      self._config_data.cluster.tcpudp_port_pool.add(network_port)
1303

    
1304
    del self._config_data.instances[instance_name]
1305
    self._config_data.cluster.serial_no += 1
1306
    self._WriteConfig()
1307

    
1308
  @locking.ssynchronized(_config_lock)
1309
  def RenameInstance(self, old_name, new_name):
1310
    """Rename an instance.
1311

1312
    This needs to be done in ConfigWriter and not by RemoveInstance
1313
    combined with AddInstance as only we can guarantee an atomic
1314
    rename.
1315

1316
    """
1317
    if old_name not in self._config_data.instances:
1318
      raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
1319

    
1320
    # Operate on a copy to not loose instance object in case of a failure
1321
    inst = self._config_data.instances[old_name].Copy()
1322
    inst.name = new_name
1323

    
1324
    for (idx, disk) in enumerate(inst.disks):
1325
      if disk.dev_type == constants.LD_FILE:
1326
        # rename the file paths in logical and physical id
1327
        file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
1328
        disk.logical_id = (disk.logical_id[0],
1329
                           utils.PathJoin(file_storage_dir, inst.name,
1330
                                          "disk%s" % idx))
1331
        disk.physical_id = disk.logical_id
1332

    
1333
    # Actually replace instance object
1334
    del self._config_data.instances[old_name]
1335
    self._config_data.instances[inst.name] = inst
1336

    
1337
    # Force update of ssconf files
1338
    self._config_data.cluster.serial_no += 1
1339

    
1340
    self._WriteConfig()
1341

    
1342
  @locking.ssynchronized(_config_lock)
1343
  def MarkInstanceDown(self, instance_name):
1344
    """Mark the status of an instance to down in the configuration.
1345

1346
    """
1347
    self._SetInstanceStatus(instance_name, constants.ADMINST_DOWN)
1348

    
1349
  def _UnlockedGetInstanceList(self):
1350
    """Get the list of instances.
1351

1352
    This function is for internal use, when the config lock is already held.
1353

1354
    """
1355
    return self._config_data.instances.keys()
1356

    
1357
  @locking.ssynchronized(_config_lock, shared=1)
1358
  def GetInstanceList(self):
1359
    """Get the list of instances.
1360

1361
    @return: array of instances, ex. ['instance2.example.com',
1362
        'instance1.example.com']
1363

1364
    """
1365
    return self._UnlockedGetInstanceList()
1366

    
1367
  def ExpandInstanceName(self, short_name):
1368
    """Attempt to expand an incomplete instance name.
1369

1370
    """
1371
    # Locking is done in L{ConfigWriter.GetInstanceList}
1372
    return _MatchNameComponentIgnoreCase(short_name, self.GetInstanceList())
1373

    
1374
  def _UnlockedGetInstanceInfo(self, instance_name):
1375
    """Returns information about an instance.
1376

1377
    This function is for internal use, when the config lock is already held.
1378

1379
    """
1380
    if instance_name not in self._config_data.instances:
1381
      return None
1382

    
1383
    return self._config_data.instances[instance_name]
1384

    
1385
  @locking.ssynchronized(_config_lock, shared=1)
1386
  def GetInstanceInfo(self, instance_name):
1387
    """Returns information about an instance.
1388

1389
    It takes the information from the configuration file. Other information of
1390
    an instance are taken from the live systems.
1391

1392
    @param instance_name: name of the instance, e.g.
1393
        I{instance1.example.com}
1394

1395
    @rtype: L{objects.Instance}
1396
    @return: the instance object
1397

1398
    """
1399
    return self._UnlockedGetInstanceInfo(instance_name)
1400

    
1401
  @locking.ssynchronized(_config_lock, shared=1)
1402
  def GetInstanceNodeGroups(self, instance_name, primary_only=False):
1403
    """Returns set of node group UUIDs for instance's nodes.
1404

1405
    @rtype: frozenset
1406

1407
    """
1408
    instance = self._UnlockedGetInstanceInfo(instance_name)
1409
    if not instance:
1410
      raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
1411

    
1412
    if primary_only:
1413
      nodes = [instance.primary_node]
1414
    else:
1415
      nodes = instance.all_nodes
1416

    
1417
    return frozenset(self._UnlockedGetNodeInfo(node_name).group
1418
                     for node_name in nodes)
1419

    
1420
  @locking.ssynchronized(_config_lock, shared=1)
1421
  def GetMultiInstanceInfo(self, instances):
1422
    """Get the configuration of multiple instances.
1423

1424
    @param instances: list of instance names
1425
    @rtype: list
1426
    @return: list of tuples (instance, instance_info), where
1427
        instance_info is what would GetInstanceInfo return for the
1428
        node, while keeping the original order
1429

1430
    """
1431
    return [(name, self._UnlockedGetInstanceInfo(name)) for name in instances]
1432

    
1433
  @locking.ssynchronized(_config_lock, shared=1)
1434
  def GetAllInstancesInfo(self):
1435
    """Get the configuration of all instances.
1436

1437
    @rtype: dict
1438
    @return: dict of (instance, instance_info), where instance_info is what
1439
              would GetInstanceInfo return for the node
1440

1441
    """
1442
    my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
1443
                    for instance in self._UnlockedGetInstanceList()])
1444
    return my_dict
1445

    
1446
  @locking.ssynchronized(_config_lock, shared=1)
1447
  def GetInstancesInfoByFilter(self, filter_fn):
1448
    """Get instance configuration with a filter.
1449

1450
    @type filter_fn: callable
1451
    @param filter_fn: Filter function receiving instance object as parameter,
1452
      returning boolean. Important: this function is called while the
1453
      configuration locks is held. It must not do any complex work or call
1454
      functions potentially leading to a deadlock. Ideally it doesn't call any
1455
      other functions and just compares instance attributes.
1456

1457
    """
1458
    return dict((name, inst)
1459
                for (name, inst) in self._config_data.instances.items()
1460
                if filter_fn(inst))
1461

    
1462
  @locking.ssynchronized(_config_lock)
1463
  def AddNode(self, node, ec_id):
1464
    """Add a node to the configuration.
1465

1466
    @type node: L{objects.Node}
1467
    @param node: a Node instance
1468

1469
    """
1470
    logging.info("Adding node %s to configuration", node.name)
1471

    
1472
    self._EnsureUUID(node, ec_id)
1473

    
1474
    node.serial_no = 1
1475
    node.ctime = node.mtime = time.time()
1476
    self._UnlockedAddNodeToGroup(node.name, node.group)
1477
    self._config_data.nodes[node.name] = node
1478
    self._config_data.cluster.serial_no += 1
1479
    self._WriteConfig()
1480

    
1481
  @locking.ssynchronized(_config_lock)
1482
  def RemoveNode(self, node_name):
1483
    """Remove a node from the configuration.
1484

1485
    """
1486
    logging.info("Removing node %s from configuration", node_name)
1487

    
1488
    if node_name not in self._config_data.nodes:
1489
      raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1490

    
1491
    self._UnlockedRemoveNodeFromGroup(self._config_data.nodes[node_name])
1492
    del self._config_data.nodes[node_name]
1493
    self._config_data.cluster.serial_no += 1
1494
    self._WriteConfig()
1495

    
1496
  def ExpandNodeName(self, short_name):
1497
    """Attempt to expand an incomplete node name.
1498

1499
    """
1500
    # Locking is done in L{ConfigWriter.GetNodeList}
1501
    return _MatchNameComponentIgnoreCase(short_name, self.GetNodeList())
1502

    
1503
  def _UnlockedGetNodeInfo(self, node_name):
1504
    """Get the configuration of a node, as stored in the config.
1505

1506
    This function is for internal use, when the config lock is already
1507
    held.
1508

1509
    @param node_name: the node name, e.g. I{node1.example.com}
1510

1511
    @rtype: L{objects.Node}
1512
    @return: the node object
1513

1514
    """
1515
    if node_name not in self._config_data.nodes:
1516
      return None
1517

    
1518
    return self._config_data.nodes[node_name]
1519

    
1520
  @locking.ssynchronized(_config_lock, shared=1)
1521
  def GetNodeInfo(self, node_name):
1522
    """Get the configuration of a node, as stored in the config.
1523

1524
    This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1525

1526
    @param node_name: the node name, e.g. I{node1.example.com}
1527

1528
    @rtype: L{objects.Node}
1529
    @return: the node object
1530

1531
    """
1532
    return self._UnlockedGetNodeInfo(node_name)
1533

    
1534
  @locking.ssynchronized(_config_lock, shared=1)
1535
  def GetNodeInstances(self, node_name):
1536
    """Get the instances of a node, as stored in the config.
1537

1538
    @param node_name: the node name, e.g. I{node1.example.com}
1539

1540
    @rtype: (list, list)
1541
    @return: a tuple with two lists: the primary and the secondary instances
1542

1543
    """
1544
    pri = []
1545
    sec = []
1546
    for inst in self._config_data.instances.values():
1547
      if inst.primary_node == node_name:
1548
        pri.append(inst.name)
1549
      if node_name in inst.secondary_nodes:
1550
        sec.append(inst.name)
1551
    return (pri, sec)
1552

    
1553
  @locking.ssynchronized(_config_lock, shared=1)
1554
  def GetNodeGroupInstances(self, uuid, primary_only=False):
1555
    """Get the instances of a node group.
1556

1557
    @param uuid: Node group UUID
1558
    @param primary_only: Whether to only consider primary nodes
1559
    @rtype: frozenset
1560
    @return: List of instance names in node group
1561

1562
    """
1563
    if primary_only:
1564
      nodes_fn = lambda inst: [inst.primary_node]
1565
    else:
1566
      nodes_fn = lambda inst: inst.all_nodes
1567

    
1568
    return frozenset(inst.name
1569
                     for inst in self._config_data.instances.values()
1570
                     for node_name in nodes_fn(inst)
1571
                     if self._UnlockedGetNodeInfo(node_name).group == uuid)
1572

    
1573
  def _UnlockedGetNodeList(self):
1574
    """Return the list of nodes which are in the configuration.
1575

1576
    This function is for internal use, when the config lock is already
1577
    held.
1578

1579
    @rtype: list
1580

1581
    """
1582
    return self._config_data.nodes.keys()
1583

    
1584
  @locking.ssynchronized(_config_lock, shared=1)
1585
  def GetNodeList(self):
1586
    """Return the list of nodes which are in the configuration.
1587

1588
    """
1589
    return self._UnlockedGetNodeList()
1590

    
1591
  def _UnlockedGetOnlineNodeList(self):
1592
    """Return the list of nodes which are online.
1593

1594
    """
1595
    all_nodes = [self._UnlockedGetNodeInfo(node)
1596
                 for node in self._UnlockedGetNodeList()]
1597
    return [node.name for node in all_nodes if not node.offline]
1598

    
1599
  @locking.ssynchronized(_config_lock, shared=1)
1600
  def GetOnlineNodeList(self):
1601
    """Return the list of nodes which are online.
1602

1603
    """
1604
    return self._UnlockedGetOnlineNodeList()
1605

    
1606
  @locking.ssynchronized(_config_lock, shared=1)
1607
  def GetVmCapableNodeList(self):
1608
    """Return the list of nodes which are not vm capable.
1609

1610
    """
1611
    all_nodes = [self._UnlockedGetNodeInfo(node)
1612
                 for node in self._UnlockedGetNodeList()]
1613
    return [node.name for node in all_nodes if node.vm_capable]
1614

    
1615
  @locking.ssynchronized(_config_lock, shared=1)
1616
  def GetNonVmCapableNodeList(self):
1617
    """Return the list of nodes which are not vm capable.
1618

1619
    """
1620
    all_nodes = [self._UnlockedGetNodeInfo(node)
1621
                 for node in self._UnlockedGetNodeList()]
1622
    return [node.name for node in all_nodes if not node.vm_capable]
1623

    
1624
  @locking.ssynchronized(_config_lock, shared=1)
1625
  def GetMultiNodeInfo(self, nodes):
1626
    """Get the configuration of multiple nodes.
1627

1628
    @param nodes: list of node names
1629
    @rtype: list
1630
    @return: list of tuples of (node, node_info), where node_info is
1631
        what would GetNodeInfo return for the node, in the original
1632
        order
1633

1634
    """
1635
    return [(name, self._UnlockedGetNodeInfo(name)) for name in nodes]
1636

    
1637
  @locking.ssynchronized(_config_lock, shared=1)
1638
  def GetAllNodesInfo(self):
1639
    """Get the configuration of all nodes.
1640

1641
    @rtype: dict
1642
    @return: dict of (node, node_info), where node_info is what
1643
              would GetNodeInfo return for the node
1644

1645
    """
1646
    return self._UnlockedGetAllNodesInfo()
1647

    
1648
  def _UnlockedGetAllNodesInfo(self):
1649
    """Gets configuration of all nodes.
1650

1651
    @note: See L{GetAllNodesInfo}
1652

1653
    """
1654
    return dict([(node, self._UnlockedGetNodeInfo(node))
1655
                 for node in self._UnlockedGetNodeList()])
1656

    
1657
  @locking.ssynchronized(_config_lock, shared=1)
1658
  def GetNodeGroupsFromNodes(self, nodes):
1659
    """Returns groups for a list of nodes.
1660

1661
    @type nodes: list of string
1662
    @param nodes: List of node names
1663
    @rtype: frozenset
1664

1665
    """
1666
    return frozenset(self._UnlockedGetNodeInfo(name).group for name in nodes)
1667

    
1668
  def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1669
    """Get the number of current and maximum desired and possible candidates.
1670

1671
    @type exceptions: list
1672
    @param exceptions: if passed, list of nodes that should be ignored
1673
    @rtype: tuple
1674
    @return: tuple of (current, desired and possible, possible)
1675

1676
    """
1677
    mc_now = mc_should = mc_max = 0
1678
    for node in self._config_data.nodes.values():
1679
      if exceptions and node.name in exceptions:
1680
        continue
1681
      if not (node.offline or node.drained) and node.master_capable:
1682
        mc_max += 1
1683
      if node.master_candidate:
1684
        mc_now += 1
1685
    mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1686
    return (mc_now, mc_should, mc_max)
1687

    
1688
  @locking.ssynchronized(_config_lock, shared=1)
1689
  def GetMasterCandidateStats(self, exceptions=None):
1690
    """Get the number of current and maximum possible candidates.
1691

1692
    This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1693

1694
    @type exceptions: list
1695
    @param exceptions: if passed, list of nodes that should be ignored
1696
    @rtype: tuple
1697
    @return: tuple of (current, max)
1698

1699
    """
1700
    return self._UnlockedGetMasterCandidateStats(exceptions)
1701

    
1702
  @locking.ssynchronized(_config_lock)
1703
  def MaintainCandidatePool(self, exceptions):
1704
    """Try to grow the candidate pool to the desired size.
1705

1706
    @type exceptions: list
1707
    @param exceptions: if passed, list of nodes that should be ignored
1708
    @rtype: list
1709
    @return: list with the adjusted nodes (L{objects.Node} instances)
1710

1711
    """
1712
    mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1713
    mod_list = []
1714
    if mc_now < mc_max:
1715
      node_list = self._config_data.nodes.keys()
1716
      random.shuffle(node_list)
1717
      for name in node_list:
1718
        if mc_now >= mc_max:
1719
          break
1720
        node = self._config_data.nodes[name]
1721
        if (node.master_candidate or node.offline or node.drained or
1722
            node.name in exceptions or not node.master_capable):
1723
          continue
1724
        mod_list.append(node)
1725
        node.master_candidate = True
1726
        node.serial_no += 1
1727
        mc_now += 1
1728
      if mc_now != mc_max:
1729
        # this should not happen
1730
        logging.warning("Warning: MaintainCandidatePool didn't manage to"
1731
                        " fill the candidate pool (%d/%d)", mc_now, mc_max)
1732
      if mod_list:
1733
        self._config_data.cluster.serial_no += 1
1734
        self._WriteConfig()
1735

    
1736
    return mod_list
1737

    
1738
  def _UnlockedAddNodeToGroup(self, node_name, nodegroup_uuid):
1739
    """Add a given node to the specified group.
1740

1741
    """
1742
    if nodegroup_uuid not in self._config_data.nodegroups:
1743
      # This can happen if a node group gets deleted between its lookup and
1744
      # when we're adding the first node to it, since we don't keep a lock in
1745
      # the meantime. It's ok though, as we'll fail cleanly if the node group
1746
      # is not found anymore.
1747
      raise errors.OpExecError("Unknown node group: %s" % nodegroup_uuid)
1748
    if node_name not in self._config_data.nodegroups[nodegroup_uuid].members:
1749
      self._config_data.nodegroups[nodegroup_uuid].members.append(node_name)
1750

    
1751
  def _UnlockedRemoveNodeFromGroup(self, node):
1752
    """Remove a given node from its group.
1753

1754
    """
1755
    nodegroup = node.group
1756
    if nodegroup not in self._config_data.nodegroups:
1757
      logging.warning("Warning: node '%s' has unknown node group '%s'"
1758
                      " (while being removed from it)", node.name, nodegroup)
1759
    nodegroup_obj = self._config_data.nodegroups[nodegroup]
1760
    if node.name not in nodegroup_obj.members:
1761
      logging.warning("Warning: node '%s' not a member of its node group '%s'"
1762
                      " (while being removed from it)", node.name, nodegroup)
1763
    else:
1764
      nodegroup_obj.members.remove(node.name)
1765

    
1766
  @locking.ssynchronized(_config_lock)
1767
  def AssignGroupNodes(self, mods):
1768
    """Changes the group of a number of nodes.
1769

1770
    @type mods: list of tuples; (node name, new group UUID)
1771
    @param mods: Node membership modifications
1772

1773
    """
1774
    groups = self._config_data.nodegroups
1775
    nodes = self._config_data.nodes
1776

    
1777
    resmod = []
1778

    
1779
    # Try to resolve names/UUIDs first
1780
    for (node_name, new_group_uuid) in mods:
1781
      try:
1782
        node = nodes[node_name]
1783
      except KeyError:
1784
        raise errors.ConfigurationError("Unable to find node '%s'" % node_name)
1785

    
1786
      if node.group == new_group_uuid:
1787
        # Node is being assigned to its current group
1788
        logging.debug("Node '%s' was assigned to its current group (%s)",
1789
                      node_name, node.group)
1790
        continue
1791

    
1792
      # Try to find current group of node
1793
      try:
1794
        old_group = groups[node.group]
1795
      except KeyError:
1796
        raise errors.ConfigurationError("Unable to find old group '%s'" %
1797
                                        node.group)
1798

    
1799
      # Try to find new group for node
1800
      try:
1801
        new_group = groups[new_group_uuid]
1802
      except KeyError:
1803
        raise errors.ConfigurationError("Unable to find new group '%s'" %
1804
                                        new_group_uuid)
1805

    
1806
      assert node.name in old_group.members, \
1807
        ("Inconsistent configuration: node '%s' not listed in members for its"
1808
         " old group '%s'" % (node.name, old_group.uuid))
1809
      assert node.name not in new_group.members, \
1810
        ("Inconsistent configuration: node '%s' already listed in members for"
1811
         " its new group '%s'" % (node.name, new_group.uuid))
1812

    
1813
      resmod.append((node, old_group, new_group))
1814

    
1815
    # Apply changes
1816
    for (node, old_group, new_group) in resmod:
1817
      assert node.uuid != new_group.uuid and old_group.uuid != new_group.uuid, \
1818
        "Assigning to current group is not possible"
1819

    
1820
      node.group = new_group.uuid
1821

    
1822
      # Update members of involved groups
1823
      if node.name in old_group.members:
1824
        old_group.members.remove(node.name)
1825
      if node.name not in new_group.members:
1826
        new_group.members.append(node.name)
1827

    
1828
    # Update timestamps and serials (only once per node/group object)
1829
    now = time.time()
1830
    for obj in frozenset(itertools.chain(*resmod)): # pylint: disable=W0142
1831
      obj.serial_no += 1
1832
      obj.mtime = now
1833

    
1834
    # Force ssconf update
1835
    self._config_data.cluster.serial_no += 1
1836

    
1837
    self._WriteConfig()
1838

    
1839
  def _BumpSerialNo(self):
1840
    """Bump up the serial number of the config.
1841

1842
    """
1843
    self._config_data.serial_no += 1
1844
    self._config_data.mtime = time.time()
1845

    
1846
  def _AllUUIDObjects(self):
1847
    """Returns all objects with uuid attributes.
1848

1849
    """
1850
    return (self._config_data.instances.values() +
1851
            self._config_data.nodes.values() +
1852
            self._config_data.nodegroups.values() +
1853
            [self._config_data.cluster])
1854

    
1855
  def _OpenConfig(self, accept_foreign):
1856
    """Read the config data from disk.
1857

1858
    """
1859
    raw_data = utils.ReadFile(self._cfg_file)
1860

    
1861
    try:
1862
      data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1863
    except Exception, err:
1864
      raise errors.ConfigurationError(err)
1865

    
1866
    # Make sure the configuration has the right version
1867
    _ValidateConfig(data)
1868

    
1869
    if (not hasattr(data, "cluster") or
1870
        not hasattr(data.cluster, "rsahostkeypub")):
1871
      raise errors.ConfigurationError("Incomplete configuration"
1872
                                      " (missing cluster.rsahostkeypub)")
1873

    
1874
    if data.cluster.master_node != self._my_hostname and not accept_foreign:
1875
      msg = ("The configuration denotes node %s as master, while my"
1876
             " hostname is %s; opening a foreign configuration is only"
1877
             " possible in accept_foreign mode" %
1878
             (data.cluster.master_node, self._my_hostname))
1879
      raise errors.ConfigurationError(msg)
1880

    
1881
    # Upgrade configuration if needed
1882
    data.UpgradeConfig()
1883

    
1884
    self._config_data = data
1885
    # reset the last serial as -1 so that the next write will cause
1886
    # ssconf update
1887
    self._last_cluster_serial = -1
1888

    
1889
    # And finally run our (custom) config upgrade sequence
1890
    self._UpgradeConfig()
1891

    
1892
    self._cfg_id = utils.GetFileID(path=self._cfg_file)
1893

    
1894
  def _UpgradeConfig(self):
1895
    """Run upgrade steps that cannot be done purely in the objects.
1896

1897
    This is because some data elements need uniqueness across the
1898
    whole configuration, etc.
1899

1900
    @warning: this function will call L{_WriteConfig()}, but also
1901
        L{DropECReservations} so it needs to be called only from a
1902
        "safe" place (the constructor). If one wanted to call it with
1903
        the lock held, a DropECReservationUnlocked would need to be
1904
        created first, to avoid causing deadlock.
1905

1906
    """
1907
    modified = False
1908
    for item in self._AllUUIDObjects():
1909
      if item.uuid is None:
1910
        item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1911
        modified = True
1912
    if not self._config_data.nodegroups:
1913
      default_nodegroup_name = constants.INITIAL_NODE_GROUP_NAME
1914
      default_nodegroup = objects.NodeGroup(name=default_nodegroup_name,
1915
                                            members=[])
1916
      self._UnlockedAddNodeGroup(default_nodegroup, _UPGRADE_CONFIG_JID, True)
1917
      modified = True
1918
    for node in self._config_data.nodes.values():
1919
      if not node.group:
1920
        node.group = self.LookupNodeGroup(None)
1921
        modified = True
1922
      # This is technically *not* an upgrade, but needs to be done both when
1923
      # nodegroups are being added, and upon normally loading the config,
1924
      # because the members list of a node group is discarded upon
1925
      # serializing/deserializing the object.
1926
      self._UnlockedAddNodeToGroup(node.name, node.group)
1927
    if modified:
1928
      self._WriteConfig()
1929
      # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1930
      # only called at config init time, without the lock held
1931
      self.DropECReservations(_UPGRADE_CONFIG_JID)
1932

    
1933
  def _DistributeConfig(self, feedback_fn):
1934
    """Distribute the configuration to the other nodes.
1935

1936
    Currently, this only copies the configuration file. In the future,
1937
    it could be used to encapsulate the 2/3-phase update mechanism.
1938

1939
    """
1940
    if self._offline:
1941
      return True
1942

    
1943
    bad = False
1944

    
1945
    node_list = []
1946
    addr_list = []
1947
    myhostname = self._my_hostname
1948
    # we can skip checking whether _UnlockedGetNodeInfo returns None
1949
    # since the node list comes from _UnlocketGetNodeList, and we are
1950
    # called with the lock held, so no modifications should take place
1951
    # in between
1952
    for node_name in self._UnlockedGetNodeList():
1953
      if node_name == myhostname:
1954
        continue
1955
      node_info = self._UnlockedGetNodeInfo(node_name)
1956
      if not node_info.master_candidate:
1957
        continue
1958
      node_list.append(node_info.name)
1959
      addr_list.append(node_info.primary_ip)
1960

    
1961
    # TODO: Use dedicated resolver talking to config writer for name resolution
1962
    result = \
1963
      self._GetRpc(addr_list).call_upload_file(node_list, self._cfg_file)
1964
    for to_node, to_result in result.items():
1965
      msg = to_result.fail_msg
1966
      if msg:
1967
        msg = ("Copy of file %s to node %s failed: %s" %
1968
               (self._cfg_file, to_node, msg))
1969
        logging.error(msg)
1970

    
1971
        if feedback_fn:
1972
          feedback_fn(msg)
1973

    
1974
        bad = True
1975

    
1976
    return not bad
1977

    
1978
  def _WriteConfig(self, destination=None, feedback_fn=None):
1979
    """Write the configuration data to persistent storage.
1980

1981
    """
1982
    assert feedback_fn is None or callable(feedback_fn)
1983

    
1984
    # Warn on config errors, but don't abort the save - the
1985
    # configuration has already been modified, and we can't revert;
1986
    # the best we can do is to warn the user and save as is, leaving
1987
    # recovery to the user
1988
    config_errors = self._UnlockedVerifyConfig()
1989
    if config_errors:
1990
      errmsg = ("Configuration data is not consistent: %s" %
1991
                (utils.CommaJoin(config_errors)))
1992
      logging.critical(errmsg)
1993
      if feedback_fn:
1994
        feedback_fn(errmsg)
1995

    
1996
    if destination is None:
1997
      destination = self._cfg_file
1998
    self._BumpSerialNo()
1999
    txt = serializer.Dump(self._config_data.ToDict())
2000

    
2001
    getents = self._getents()
2002
    try:
2003
      fd = utils.SafeWriteFile(destination, self._cfg_id, data=txt,
2004
                               close=False, gid=getents.confd_gid, mode=0640)
2005
    except errors.LockError:
2006
      raise errors.ConfigurationError("The configuration file has been"
2007
                                      " modified since the last write, cannot"
2008
                                      " update")
2009
    try:
2010
      self._cfg_id = utils.GetFileID(fd=fd)
2011
    finally:
2012
      os.close(fd)
2013

    
2014
    self.write_count += 1
2015

    
2016
    # and redistribute the config file to master candidates
2017
    self._DistributeConfig(feedback_fn)
2018

    
2019
    # Write ssconf files on all nodes (including locally)
2020
    if self._last_cluster_serial < self._config_data.cluster.serial_no:
2021
      if not self._offline:
2022
        result = self._GetRpc(None).call_write_ssconf_files(
2023
          self._UnlockedGetOnlineNodeList(),
2024
          self._UnlockedGetSsconfValues())
2025

    
2026
        for nname, nresu in result.items():
2027
          msg = nresu.fail_msg
2028
          if msg:
2029
            errmsg = ("Error while uploading ssconf files to"
2030
                      " node %s: %s" % (nname, msg))
2031
            logging.warning(errmsg)
2032

    
2033
            if feedback_fn:
2034
              feedback_fn(errmsg)
2035

    
2036
      self._last_cluster_serial = self._config_data.cluster.serial_no
2037

    
2038
  def _UnlockedGetSsconfValues(self):
2039
    """Return the values needed by ssconf.
2040

2041
    @rtype: dict
2042
    @return: a dictionary with keys the ssconf names and values their
2043
        associated value
2044

2045
    """
2046
    fn = "\n".join
2047
    instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
2048
    node_names = utils.NiceSort(self._UnlockedGetNodeList())
2049
    node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
2050
    node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
2051
                    for ninfo in node_info]
2052
    node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
2053
                    for ninfo in node_info]
2054

    
2055
    instance_data = fn(instance_names)
2056
    off_data = fn(node.name for node in node_info if node.offline)
2057
    on_data = fn(node.name for node in node_info if not node.offline)
2058
    mc_data = fn(node.name for node in node_info if node.master_candidate)
2059
    mc_ips_data = fn(node.primary_ip for node in node_info
2060
                     if node.master_candidate)
2061
    node_data = fn(node_names)
2062
    node_pri_ips_data = fn(node_pri_ips)
2063
    node_snd_ips_data = fn(node_snd_ips)
2064

    
2065
    cluster = self._config_data.cluster
2066
    cluster_tags = fn(cluster.GetTags())
2067

    
2068
    hypervisor_list = fn(cluster.enabled_hypervisors)
2069

    
2070
    uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n")
2071

    
2072
    nodegroups = ["%s %s" % (nodegroup.uuid, nodegroup.name) for nodegroup in
2073
                  self._config_data.nodegroups.values()]
2074
    nodegroups_data = fn(utils.NiceSort(nodegroups))
2075

    
2076
    ssconf_values = {
2077
      constants.SS_CLUSTER_NAME: cluster.cluster_name,
2078
      constants.SS_CLUSTER_TAGS: cluster_tags,
2079
      constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
2080
      constants.SS_SHARED_FILE_STORAGE_DIR: cluster.shared_file_storage_dir,
2081
      constants.SS_MASTER_CANDIDATES: mc_data,
2082
      constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
2083
      constants.SS_MASTER_IP: cluster.master_ip,
2084
      constants.SS_MASTER_NETDEV: cluster.master_netdev,
2085
      constants.SS_MASTER_NETMASK: str(cluster.master_netmask),
2086
      constants.SS_MASTER_NODE: cluster.master_node,
2087
      constants.SS_NODE_LIST: node_data,
2088
      constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
2089
      constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
2090
      constants.SS_OFFLINE_NODES: off_data,
2091
      constants.SS_ONLINE_NODES: on_data,
2092
      constants.SS_PRIMARY_IP_FAMILY: str(cluster.primary_ip_family),
2093
      constants.SS_INSTANCE_LIST: instance_data,
2094
      constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
2095
      constants.SS_HYPERVISOR_LIST: hypervisor_list,
2096
      constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
2097
      constants.SS_UID_POOL: uid_pool,
2098
      constants.SS_NODEGROUPS: nodegroups_data,
2099
      }
2100
    bad_values = [(k, v) for k, v in ssconf_values.items()
2101
                  if not isinstance(v, (str, basestring))]
2102
    if bad_values:
2103
      err = utils.CommaJoin("%s=%s" % (k, v) for k, v in bad_values)
2104
      raise errors.ConfigurationError("Some ssconf key(s) have non-string"
2105
                                      " values: %s" % err)
2106
    return ssconf_values
2107

    
2108
  @locking.ssynchronized(_config_lock, shared=1)
2109
  def GetSsconfValues(self):
2110
    """Wrapper using lock around _UnlockedGetSsconf().
2111

2112
    """
2113
    return self._UnlockedGetSsconfValues()
2114

    
2115
  @locking.ssynchronized(_config_lock, shared=1)
2116
  def GetVGName(self):
2117
    """Return the volume group name.
2118

2119
    """
2120
    return self._config_data.cluster.volume_group_name
2121

    
2122
  @locking.ssynchronized(_config_lock)
2123
  def SetVGName(self, vg_name):
2124
    """Set the volume group name.
2125

2126
    """
2127
    self._config_data.cluster.volume_group_name = vg_name
2128
    self._config_data.cluster.serial_no += 1
2129
    self._WriteConfig()
2130

    
2131
  @locking.ssynchronized(_config_lock, shared=1)
2132
  def GetDRBDHelper(self):
2133
    """Return DRBD usermode helper.
2134

2135
    """
2136
    return self._config_data.cluster.drbd_usermode_helper
2137

    
2138
  @locking.ssynchronized(_config_lock)
2139
  def SetDRBDHelper(self, drbd_helper):
2140
    """Set DRBD usermode helper.
2141

2142
    """
2143
    self._config_data.cluster.drbd_usermode_helper = drbd_helper
2144
    self._config_data.cluster.serial_no += 1
2145
    self._WriteConfig()
2146

    
2147
  @locking.ssynchronized(_config_lock, shared=1)
2148
  def GetMACPrefix(self):
2149
    """Return the mac prefix.
2150

2151
    """
2152
    return self._config_data.cluster.mac_prefix
2153

    
2154
  @locking.ssynchronized(_config_lock, shared=1)
2155
  def GetClusterInfo(self):
2156
    """Returns information about the cluster
2157

2158
    @rtype: L{objects.Cluster}
2159
    @return: the cluster object
2160

2161
    """
2162
    return self._config_data.cluster
2163

    
2164
  @locking.ssynchronized(_config_lock, shared=1)
2165
  def HasAnyDiskOfType(self, dev_type):
2166
    """Check if in there is at disk of the given type in the configuration.
2167

2168
    """
2169
    return self._config_data.HasAnyDiskOfType(dev_type)
2170

    
2171
  @locking.ssynchronized(_config_lock)
2172
  def Update(self, target, feedback_fn):
2173
    """Notify function to be called after updates.
2174

2175
    This function must be called when an object (as returned by
2176
    GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
2177
    caller wants the modifications saved to the backing store. Note
2178
    that all modified objects will be saved, but the target argument
2179
    is the one the caller wants to ensure that it's saved.
2180

2181
    @param target: an instance of either L{objects.Cluster},
2182
        L{objects.Node} or L{objects.Instance} which is existing in
2183
        the cluster
2184
    @param feedback_fn: Callable feedback function
2185

2186
    """
2187
    if self._config_data is None:
2188
      raise errors.ProgrammerError("Configuration file not read,"
2189
                                   " cannot save.")
2190
    update_serial = False
2191
    if isinstance(target, objects.Cluster):
2192
      test = target == self._config_data.cluster
2193
    elif isinstance(target, objects.Node):
2194
      test = target in self._config_data.nodes.values()
2195
      update_serial = True
2196
    elif isinstance(target, objects.Instance):
2197
      test = target in self._config_data.instances.values()
2198
    elif isinstance(target, objects.NodeGroup):
2199
      test = target in self._config_data.nodegroups.values()
2200
    else:
2201
      raise errors.ProgrammerError("Invalid object type (%s) passed to"
2202
                                   " ConfigWriter.Update" % type(target))
2203
    if not test:
2204
      raise errors.ConfigurationError("Configuration updated since object"
2205
                                      " has been read or unknown object")
2206
    target.serial_no += 1
2207
    target.mtime = now = time.time()
2208

    
2209
    if update_serial:
2210
      # for node updates, we need to increase the cluster serial too
2211
      self._config_data.cluster.serial_no += 1
2212
      self._config_data.cluster.mtime = now
2213

    
2214
    if isinstance(target, objects.Instance):
2215
      self._UnlockedReleaseDRBDMinors(target.name)
2216

    
2217
    self._WriteConfig(feedback_fn=feedback_fn)
2218

    
2219
  @locking.ssynchronized(_config_lock)
2220
  def DropECReservations(self, ec_id):
2221
    """Drop per-execution-context reservations
2222

2223
    """
2224
    for rm in self._all_rms:
2225
      rm.DropECReservations(ec_id)