Statistics
| Branch: | Tag: | Revision:

root / lib / config.py @ 099c52ad

History | View | Annotate | Download (44 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Configuration management for Ganeti
23

24
This module provides the interface to the Ganeti cluster configuration.
25

26
The configuration data is stored on every node but is updated on the master
27
only. After each update, the master distributes the data to the other nodes.
28

29
Currently, the data storage format is JSON. YAML was slow and consuming too
30
much memory.
31

32
"""
33

    
34
import os
35
import random
36
import logging
37
import time
38

    
39
from ganeti import errors
40
from ganeti import locking
41
from ganeti import utils
42
from ganeti import constants
43
from ganeti import rpc
44
from ganeti import objects
45
from ganeti import serializer
46

    
47

    
48
_config_lock = locking.SharedLock()
49

    
50

    
51
def _ValidateConfig(data):
52
  """Verifies that a configuration objects looks valid.
53

54
  This only verifies the version of the configuration.
55

56
  @raise errors.ConfigurationError: if the version differs from what
57
      we expect
58

59
  """
60
  if data.version != constants.CONFIG_VERSION:
61
    raise errors.ConfigurationError("Cluster configuration version"
62
                                    " mismatch, got %s instead of %s" %
63
                                    (data.version,
64
                                     constants.CONFIG_VERSION))
65

    
66

    
67
class ConfigWriter:
68
  """The interface to the cluster configuration.
69

70
  """
71
  def __init__(self, cfg_file=None, offline=False):
72
    self.write_count = 0
73
    self._lock = _config_lock
74
    self._config_data = None
75
    self._offline = offline
76
    if cfg_file is None:
77
      self._cfg_file = constants.CLUSTER_CONF_FILE
78
    else:
79
      self._cfg_file = cfg_file
80
    self._temporary_ids = set()
81
    self._temporary_drbds = {}
82
    self._temporary_macs = set()
83
    # Note: in order to prevent errors when resolving our name in
84
    # _DistributeConfig, we compute it here once and reuse it; it's
85
    # better to raise an error before starting to modify the config
86
    # file than after it was modified
87
    self._my_hostname = utils.HostInfo().name
88
    self._last_cluster_serial = -1
89
    self._OpenConfig()
90

    
91
  # this method needs to be static, so that we can call it on the class
92
  @staticmethod
93
  def IsCluster():
94
    """Check if the cluster is configured.
95

96
    """
97
    return os.path.exists(constants.CLUSTER_CONF_FILE)
98

    
99
  @locking.ssynchronized(_config_lock, shared=1)
100
  def GenerateMAC(self):
101
    """Generate a MAC for an instance.
102

103
    This should check the current instances for duplicates.
104

105
    """
106
    prefix = self._config_data.cluster.mac_prefix
107
    all_macs = self._AllMACs()
108
    retries = 64
109
    while retries > 0:
110
      byte1 = random.randrange(0, 256)
111
      byte2 = random.randrange(0, 256)
112
      byte3 = random.randrange(0, 256)
113
      mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
114
      if mac not in all_macs and mac not in self._temporary_macs:
115
        break
116
      retries -= 1
117
    else:
118
      raise errors.ConfigurationError("Can't generate unique MAC")
119
    self._temporary_macs.add(mac)
120
    return mac
121

    
122
  @locking.ssynchronized(_config_lock, shared=1)
123
  def IsMacInUse(self, mac):
124
    """Predicate: check if the specified MAC is in use in the Ganeti cluster.
125

126
    This only checks instances managed by this cluster, it does not
127
    check for potential collisions elsewhere.
128

129
    """
130
    all_macs = self._AllMACs()
131
    return mac in all_macs or mac in self._temporary_macs
132

    
133
  @locking.ssynchronized(_config_lock, shared=1)
134
  def GenerateDRBDSecret(self):
135
    """Generate a DRBD secret.
136

137
    This checks the current disks for duplicates.
138

139
    """
140
    all_secrets = self._AllDRBDSecrets()
141
    retries = 64
142
    while retries > 0:
143
      secret = utils.GenerateSecret()
144
      if secret not in all_secrets:
145
        break
146
      retries -= 1
147
    else:
148
      raise errors.ConfigurationError("Can't generate unique DRBD secret")
149
    return secret
150

    
151
  def _AllLVs(self):
152
    """Compute the list of all LVs.
153

154
    """
155
    lvnames = set()
156
    for instance in self._config_data.instances.values():
157
      node_data = instance.MapLVsByNode()
158
      for lv_list in node_data.values():
159
        lvnames.update(lv_list)
160
    return lvnames
161

    
162
  def _AllIDs(self, include_temporary):
163
    """Compute the list of all UUIDs and names we have.
164

165
    @type include_temporary: boolean
166
    @param include_temporary: whether to include the _temporary_ids set
167
    @rtype: set
168
    @return: a set of IDs
169

170
    """
171
    existing = set()
172
    if include_temporary:
173
      existing.update(self._temporary_ids)
174
    existing.update(self._AllLVs())
175
    existing.update(self._config_data.instances.keys())
176
    existing.update(self._config_data.nodes.keys())
177
    existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
178
    return existing
179

    
180
  def _GenerateUniqueID(self, exceptions=None):
181
    """Generate an unique UUID.
182

183
    This checks the current node, instances and disk names for
184
    duplicates.
185

186
    @param exceptions: a list with some other names which should be
187
        checked for uniqueness (used for example when you want to get
188
        more than one id at one time without adding each one in turn
189
        to the config file)
190

191
    @rtype: string
192
    @return: the unique id
193

194
    """
195
    existing = self._AllIDs(include_temporary=True)
196
    if exceptions is not None:
197
      existing.update(exceptions)
198
    retries = 64
199
    while retries > 0:
200
      unique_id = utils.NewUUID()
201
      if unique_id not in existing and unique_id is not None:
202
        break
203
    else:
204
      raise errors.ConfigurationError("Not able generate an unique ID"
205
                                      " (last tried ID: %s" % unique_id)
206
    self._temporary_ids.add(unique_id)
207
    return unique_id
208

    
209
  @locking.ssynchronized(_config_lock, shared=1)
210
  def GenerateUniqueID(self, exceptions=None):
211
    """Generate an unique ID.
212

213
    This is just a wrapper over the unlocked version.
214

215
    """
216
    return self._GenerateUniqueID(exceptions=exceptions)
217

    
218
  def _CleanupTemporaryIDs(self):
219
    """Cleanups the _temporary_ids structure.
220

221
    """
222
    existing = self._AllIDs(include_temporary=False)
223
    self._temporary_ids = self._temporary_ids - existing
224

    
225
  def _AllMACs(self):
226
    """Return all MACs present in the config.
227

228
    @rtype: list
229
    @return: the list of all MACs
230

231
    """
232
    result = []
233
    for instance in self._config_data.instances.values():
234
      for nic in instance.nics:
235
        result.append(nic.mac)
236

    
237
    return result
238

    
239
  def _AllDRBDSecrets(self):
240
    """Return all DRBD secrets present in the config.
241

242
    @rtype: list
243
    @return: the list of all DRBD secrets
244

245
    """
246
    def helper(disk, result):
247
      """Recursively gather secrets from this disk."""
248
      if disk.dev_type == constants.DT_DRBD8:
249
        result.append(disk.logical_id[5])
250
      if disk.children:
251
        for child in disk.children:
252
          helper(child, result)
253

    
254
    result = []
255
    for instance in self._config_data.instances.values():
256
      for disk in instance.disks:
257
        helper(disk, result)
258

    
259
    return result
260

    
261
  def _CheckDiskIDs(self, disk, l_ids, p_ids):
262
    """Compute duplicate disk IDs
263

264
    @type disk: L{objects.Disk}
265
    @param disk: the disk at which to start searching
266
    @type l_ids: list
267
    @param l_ids: list of current logical ids
268
    @type p_ids: list
269
    @param p_ids: list of current physical ids
270
    @rtype: list
271
    @return: a list of error messages
272

273
    """
274
    result = []
275
    if disk.logical_id is not None:
276
      if disk.logical_id in l_ids:
277
        result.append("duplicate logical id %s" % str(disk.logical_id))
278
      else:
279
        l_ids.append(disk.logical_id)
280
    if disk.physical_id is not None:
281
      if disk.physical_id in p_ids:
282
        result.append("duplicate physical id %s" % str(disk.physical_id))
283
      else:
284
        p_ids.append(disk.physical_id)
285

    
286
    if disk.children:
287
      for child in disk.children:
288
        result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
289
    return result
290

    
291
  def _UnlockedVerifyConfig(self):
292
    """Verify function.
293

294
    @rtype: list
295
    @return: a list of error messages; a non-empty list signifies
296
        configuration errors
297

298
    """
299
    result = []
300
    seen_macs = []
301
    ports = {}
302
    data = self._config_data
303
    seen_lids = []
304
    seen_pids = []
305

    
306
    # global cluster checks
307
    if not data.cluster.enabled_hypervisors:
308
      result.append("enabled hypervisors list doesn't have any entries")
309
    invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
310
    if invalid_hvs:
311
      result.append("enabled hypervisors contains invalid entries: %s" %
312
                    invalid_hvs)
313

    
314
    if data.cluster.master_node not in data.nodes:
315
      result.append("cluster has invalid primary node '%s'" %
316
                    data.cluster.master_node)
317

    
318
    # per-instance checks
319
    for instance_name in data.instances:
320
      instance = data.instances[instance_name]
321
      if instance.primary_node not in data.nodes:
322
        result.append("instance '%s' has invalid primary node '%s'" %
323
                      (instance_name, instance.primary_node))
324
      for snode in instance.secondary_nodes:
325
        if snode not in data.nodes:
326
          result.append("instance '%s' has invalid secondary node '%s'" %
327
                        (instance_name, snode))
328
      for idx, nic in enumerate(instance.nics):
329
        if nic.mac in seen_macs:
330
          result.append("instance '%s' has NIC %d mac %s duplicate" %
331
                        (instance_name, idx, nic.mac))
332
        else:
333
          seen_macs.append(nic.mac)
334

    
335
      # gather the drbd ports for duplicate checks
336
      for dsk in instance.disks:
337
        if dsk.dev_type in constants.LDS_DRBD:
338
          tcp_port = dsk.logical_id[2]
339
          if tcp_port not in ports:
340
            ports[tcp_port] = []
341
          ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
342
      # gather network port reservation
343
      net_port = getattr(instance, "network_port", None)
344
      if net_port is not None:
345
        if net_port not in ports:
346
          ports[net_port] = []
347
        ports[net_port].append((instance.name, "network port"))
348

    
349
      # instance disk verify
350
      for idx, disk in enumerate(instance.disks):
351
        result.extend(["instance '%s' disk %d error: %s" %
352
                       (instance.name, idx, msg) for msg in disk.Verify()])
353
        result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
354

    
355
    # cluster-wide pool of free ports
356
    for free_port in data.cluster.tcpudp_port_pool:
357
      if free_port not in ports:
358
        ports[free_port] = []
359
      ports[free_port].append(("cluster", "port marked as free"))
360

    
361
    # compute tcp/udp duplicate ports
362
    keys = ports.keys()
363
    keys.sort()
364
    for pnum in keys:
365
      pdata = ports[pnum]
366
      if len(pdata) > 1:
367
        txt = ", ".join(["%s/%s" % val for val in pdata])
368
        result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
369

    
370
    # highest used tcp port check
371
    if keys:
372
      if keys[-1] > data.cluster.highest_used_port:
373
        result.append("Highest used port mismatch, saved %s, computed %s" %
374
                      (data.cluster.highest_used_port, keys[-1]))
375

    
376
    if not data.nodes[data.cluster.master_node].master_candidate:
377
      result.append("Master node is not a master candidate")
378

    
379
    # master candidate checks
380
    mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
381
    if mc_now < mc_max:
382
      result.append("Not enough master candidates: actual %d, target %d" %
383
                    (mc_now, mc_max))
384

    
385
    # node checks
386
    for node in data.nodes.values():
387
      if [node.master_candidate, node.drained, node.offline].count(True) > 1:
388
        result.append("Node %s state is invalid: master_candidate=%s,"
389
                      " drain=%s, offline=%s" %
390
                      (node.name, node.master_candidate, node.drain,
391
                       node.offline))
392

    
393
    # drbd minors check
394
    d_map, duplicates = self._UnlockedComputeDRBDMap()
395
    for node, minor, instance_a, instance_b in duplicates:
396
      result.append("DRBD minor %d on node %s is assigned twice to instances"
397
                    " %s and %s" % (minor, node, instance_a, instance_b))
398

    
399
    return result
400

    
401
  @locking.ssynchronized(_config_lock, shared=1)
402
  def VerifyConfig(self):
403
    """Verify function.
404

405
    This is just a wrapper over L{_UnlockedVerifyConfig}.
406

407
    @rtype: list
408
    @return: a list of error messages; a non-empty list signifies
409
        configuration errors
410

411
    """
412
    return self._UnlockedVerifyConfig()
413

    
414
  def _UnlockedSetDiskID(self, disk, node_name):
415
    """Convert the unique ID to the ID needed on the target nodes.
416

417
    This is used only for drbd, which needs ip/port configuration.
418

419
    The routine descends down and updates its children also, because
420
    this helps when the only the top device is passed to the remote
421
    node.
422

423
    This function is for internal use, when the config lock is already held.
424

425
    """
426
    if disk.children:
427
      for child in disk.children:
428
        self._UnlockedSetDiskID(child, node_name)
429

    
430
    if disk.logical_id is None and disk.physical_id is not None:
431
      return
432
    if disk.dev_type == constants.LD_DRBD8:
433
      pnode, snode, port, pminor, sminor, secret = disk.logical_id
434
      if node_name not in (pnode, snode):
435
        raise errors.ConfigurationError("DRBD device not knowing node %s" %
436
                                        node_name)
437
      pnode_info = self._UnlockedGetNodeInfo(pnode)
438
      snode_info = self._UnlockedGetNodeInfo(snode)
439
      if pnode_info is None or snode_info is None:
440
        raise errors.ConfigurationError("Can't find primary or secondary node"
441
                                        " for %s" % str(disk))
442
      p_data = (pnode_info.secondary_ip, port)
443
      s_data = (snode_info.secondary_ip, port)
444
      if pnode == node_name:
445
        disk.physical_id = p_data + s_data + (pminor, secret)
446
      else: # it must be secondary, we tested above
447
        disk.physical_id = s_data + p_data + (sminor, secret)
448
    else:
449
      disk.physical_id = disk.logical_id
450
    return
451

    
452
  @locking.ssynchronized(_config_lock)
453
  def SetDiskID(self, disk, node_name):
454
    """Convert the unique ID to the ID needed on the target nodes.
455

456
    This is used only for drbd, which needs ip/port configuration.
457

458
    The routine descends down and updates its children also, because
459
    this helps when the only the top device is passed to the remote
460
    node.
461

462
    """
463
    return self._UnlockedSetDiskID(disk, node_name)
464

    
465
  @locking.ssynchronized(_config_lock)
466
  def AddTcpUdpPort(self, port):
467
    """Adds a new port to the available port pool.
468

469
    """
470
    if not isinstance(port, int):
471
      raise errors.ProgrammerError("Invalid type passed for port")
472

    
473
    self._config_data.cluster.tcpudp_port_pool.add(port)
474
    self._WriteConfig()
475

    
476
  @locking.ssynchronized(_config_lock, shared=1)
477
  def GetPortList(self):
478
    """Returns a copy of the current port list.
479

480
    """
481
    return self._config_data.cluster.tcpudp_port_pool.copy()
482

    
483
  @locking.ssynchronized(_config_lock)
484
  def AllocatePort(self):
485
    """Allocate a port.
486

487
    The port will be taken from the available port pool or from the
488
    default port range (and in this case we increase
489
    highest_used_port).
490

491
    """
492
    # If there are TCP/IP ports configured, we use them first.
493
    if self._config_data.cluster.tcpudp_port_pool:
494
      port = self._config_data.cluster.tcpudp_port_pool.pop()
495
    else:
496
      port = self._config_data.cluster.highest_used_port + 1
497
      if port >= constants.LAST_DRBD_PORT:
498
        raise errors.ConfigurationError("The highest used port is greater"
499
                                        " than %s. Aborting." %
500
                                        constants.LAST_DRBD_PORT)
501
      self._config_data.cluster.highest_used_port = port
502

    
503
    self._WriteConfig()
504
    return port
505

    
506
  def _UnlockedComputeDRBDMap(self):
507
    """Compute the used DRBD minor/nodes.
508

509
    @rtype: (dict, list)
510
    @return: dictionary of node_name: dict of minor: instance_name;
511
        the returned dict will have all the nodes in it (even if with
512
        an empty list), and a list of duplicates; if the duplicates
513
        list is not empty, the configuration is corrupted and its caller
514
        should raise an exception
515

516
    """
517
    def _AppendUsedPorts(instance_name, disk, used):
518
      duplicates = []
519
      if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
520
        node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
521
        for node, port in ((node_a, minor_a), (node_b, minor_b)):
522
          assert node in used, ("Node '%s' of instance '%s' not found"
523
                                " in node list" % (node, instance_name))
524
          if port in used[node]:
525
            duplicates.append((node, port, instance_name, used[node][port]))
526
          else:
527
            used[node][port] = instance_name
528
      if disk.children:
529
        for child in disk.children:
530
          duplicates.extend(_AppendUsedPorts(instance_name, child, used))
531
      return duplicates
532

    
533
    duplicates = []
534
    my_dict = dict((node, {}) for node in self._config_data.nodes)
535
    for instance in self._config_data.instances.itervalues():
536
      for disk in instance.disks:
537
        duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
538
    for (node, minor), instance in self._temporary_drbds.iteritems():
539
      if minor in my_dict[node] and my_dict[node][minor] != instance:
540
        duplicates.append((node, minor, instance, my_dict[node][minor]))
541
      else:
542
        my_dict[node][minor] = instance
543
    return my_dict, duplicates
544

    
545
  @locking.ssynchronized(_config_lock)
546
  def ComputeDRBDMap(self):
547
    """Compute the used DRBD minor/nodes.
548

549
    This is just a wrapper over L{_UnlockedComputeDRBDMap}.
550

551
    @return: dictionary of node_name: dict of minor: instance_name;
552
        the returned dict will have all the nodes in it (even if with
553
        an empty list).
554

555
    """
556
    d_map, duplicates = self._UnlockedComputeDRBDMap()
557
    if duplicates:
558
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
559
                                      str(duplicates))
560
    return d_map
561

    
562
  @locking.ssynchronized(_config_lock)
563
  def AllocateDRBDMinor(self, nodes, instance):
564
    """Allocate a drbd minor.
565

566
    The free minor will be automatically computed from the existing
567
    devices. A node can be given multiple times in order to allocate
568
    multiple minors. The result is the list of minors, in the same
569
    order as the passed nodes.
570

571
    @type instance: string
572
    @param instance: the instance for which we allocate minors
573

574
    """
575
    assert isinstance(instance, basestring), \
576
           "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
577

    
578
    d_map, duplicates = self._UnlockedComputeDRBDMap()
579
    if duplicates:
580
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
581
                                      str(duplicates))
582
    result = []
583
    for nname in nodes:
584
      ndata = d_map[nname]
585
      if not ndata:
586
        # no minors used, we can start at 0
587
        result.append(0)
588
        ndata[0] = instance
589
        self._temporary_drbds[(nname, 0)] = instance
590
        continue
591
      keys = ndata.keys()
592
      keys.sort()
593
      ffree = utils.FirstFree(keys)
594
      if ffree is None:
595
        # return the next minor
596
        # TODO: implement high-limit check
597
        minor = keys[-1] + 1
598
      else:
599
        minor = ffree
600
      # double-check minor against current instances
601
      assert minor not in d_map[nname], \
602
             ("Attempt to reuse allocated DRBD minor %d on node %s,"
603
              " already allocated to instance %s" %
604
              (minor, nname, d_map[nname][minor]))
605
      ndata[minor] = instance
606
      # double-check minor against reservation
607
      r_key = (nname, minor)
608
      assert r_key not in self._temporary_drbds, \
609
             ("Attempt to reuse reserved DRBD minor %d on node %s,"
610
              " reserved for instance %s" %
611
              (minor, nname, self._temporary_drbds[r_key]))
612
      self._temporary_drbds[r_key] = instance
613
      result.append(minor)
614
    logging.debug("Request to allocate drbd minors, input: %s, returning %s",
615
                  nodes, result)
616
    return result
617

    
618
  def _UnlockedReleaseDRBDMinors(self, instance):
619
    """Release temporary drbd minors allocated for a given instance.
620

621
    @type instance: string
622
    @param instance: the instance for which temporary minors should be
623
                     released
624

625
    """
626
    assert isinstance(instance, basestring), \
627
           "Invalid argument passed to ReleaseDRBDMinors"
628
    for key, name in self._temporary_drbds.items():
629
      if name == instance:
630
        del self._temporary_drbds[key]
631

    
632
  @locking.ssynchronized(_config_lock)
633
  def ReleaseDRBDMinors(self, instance):
634
    """Release temporary drbd minors allocated for a given instance.
635

636
    This should be called on the error paths, on the success paths
637
    it's automatically called by the ConfigWriter add and update
638
    functions.
639

640
    This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
641

642
    @type instance: string
643
    @param instance: the instance for which temporary minors should be
644
                     released
645

646
    """
647
    self._UnlockedReleaseDRBDMinors(instance)
648

    
649
  @locking.ssynchronized(_config_lock, shared=1)
650
  def GetConfigVersion(self):
651
    """Get the configuration version.
652

653
    @return: Config version
654

655
    """
656
    return self._config_data.version
657

    
658
  @locking.ssynchronized(_config_lock, shared=1)
659
  def GetClusterName(self):
660
    """Get cluster name.
661

662
    @return: Cluster name
663

664
    """
665
    return self._config_data.cluster.cluster_name
666

    
667
  @locking.ssynchronized(_config_lock, shared=1)
668
  def GetMasterNode(self):
669
    """Get the hostname of the master node for this cluster.
670

671
    @return: Master hostname
672

673
    """
674
    return self._config_data.cluster.master_node
675

    
676
  @locking.ssynchronized(_config_lock, shared=1)
677
  def GetMasterIP(self):
678
    """Get the IP of the master node for this cluster.
679

680
    @return: Master IP
681

682
    """
683
    return self._config_data.cluster.master_ip
684

    
685
  @locking.ssynchronized(_config_lock, shared=1)
686
  def GetMasterNetdev(self):
687
    """Get the master network device for this cluster.
688

689
    """
690
    return self._config_data.cluster.master_netdev
691

    
692
  @locking.ssynchronized(_config_lock, shared=1)
693
  def GetFileStorageDir(self):
694
    """Get the file storage dir for this cluster.
695

696
    """
697
    return self._config_data.cluster.file_storage_dir
698

    
699
  @locking.ssynchronized(_config_lock, shared=1)
700
  def GetHypervisorType(self):
701
    """Get the hypervisor type for this cluster.
702

703
    """
704
    return self._config_data.cluster.enabled_hypervisors[0]
705

    
706
  @locking.ssynchronized(_config_lock, shared=1)
707
  def GetHostKey(self):
708
    """Return the rsa hostkey from the config.
709

710
    @rtype: string
711
    @return: the rsa hostkey
712

713
    """
714
    return self._config_data.cluster.rsahostkeypub
715

    
716
  @locking.ssynchronized(_config_lock)
717
  def AddInstance(self, instance):
718
    """Add an instance to the config.
719

720
    This should be used after creating a new instance.
721

722
    @type instance: L{objects.Instance}
723
    @param instance: the instance object
724

725
    """
726
    if not isinstance(instance, objects.Instance):
727
      raise errors.ProgrammerError("Invalid type passed to AddInstance")
728

    
729
    if instance.disk_template != constants.DT_DISKLESS:
730
      all_lvs = instance.MapLVsByNode()
731
      logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
732

    
733
    all_macs = self._AllMACs()
734
    for nic in instance.nics:
735
      if nic.mac in all_macs:
736
        raise errors.ConfigurationError("Cannot add instance %s:"
737
                                        " MAC address '%s' already in use." %
738
                                        (instance.name, nic.mac))
739

    
740
    self._EnsureUUID(instance)
741

    
742
    instance.serial_no = 1
743
    instance.ctime = instance.mtime = time.time()
744
    self._config_data.instances[instance.name] = instance
745
    self._config_data.cluster.serial_no += 1
746
    self._UnlockedReleaseDRBDMinors(instance.name)
747
    for nic in instance.nics:
748
      self._temporary_macs.discard(nic.mac)
749
    self._WriteConfig()
750

    
751
  def _EnsureUUID(self, item):
752
    """Ensures a given object has a valid UUID.
753

754
    @param item: the instance or node to be checked
755

756
    """
757
    if not item.uuid:
758
      item.uuid = self._GenerateUniqueID()
759
    elif item.uuid in self._AllIDs(temporary=True):
760
      raise errors.ConfigurationError("Cannot add '%s': UUID already in use" %
761
                                      (item.name, item.uuid))
762

    
763
  def _SetInstanceStatus(self, instance_name, status):
764
    """Set the instance's status to a given value.
765

766
    """
767
    assert isinstance(status, bool), \
768
           "Invalid status '%s' passed to SetInstanceStatus" % (status,)
769

    
770
    if instance_name not in self._config_data.instances:
771
      raise errors.ConfigurationError("Unknown instance '%s'" %
772
                                      instance_name)
773
    instance = self._config_data.instances[instance_name]
774
    if instance.admin_up != status:
775
      instance.admin_up = status
776
      instance.serial_no += 1
777
      instance.mtime = time.time()
778
      self._WriteConfig()
779

    
780
  @locking.ssynchronized(_config_lock)
781
  def MarkInstanceUp(self, instance_name):
782
    """Mark the instance status to up in the config.
783

784
    """
785
    self._SetInstanceStatus(instance_name, True)
786

    
787
  @locking.ssynchronized(_config_lock)
788
  def RemoveInstance(self, instance_name):
789
    """Remove the instance from the configuration.
790

791
    """
792
    if instance_name not in self._config_data.instances:
793
      raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
794
    del self._config_data.instances[instance_name]
795
    self._config_data.cluster.serial_no += 1
796
    self._WriteConfig()
797

    
798
  @locking.ssynchronized(_config_lock)
799
  def RenameInstance(self, old_name, new_name):
800
    """Rename an instance.
801

802
    This needs to be done in ConfigWriter and not by RemoveInstance
803
    combined with AddInstance as only we can guarantee an atomic
804
    rename.
805

806
    """
807
    if old_name not in self._config_data.instances:
808
      raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
809
    inst = self._config_data.instances[old_name]
810
    del self._config_data.instances[old_name]
811
    inst.name = new_name
812

    
813
    for disk in inst.disks:
814
      if disk.dev_type == constants.LD_FILE:
815
        # rename the file paths in logical and physical id
816
        file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
817
        disk.physical_id = disk.logical_id = (disk.logical_id[0],
818
                                              os.path.join(file_storage_dir,
819
                                                           inst.name,
820
                                                           disk.iv_name))
821

    
822
    self._config_data.instances[inst.name] = inst
823
    self._WriteConfig()
824

    
825
  @locking.ssynchronized(_config_lock)
826
  def MarkInstanceDown(self, instance_name):
827
    """Mark the status of an instance to down in the configuration.
828

829
    """
830
    self._SetInstanceStatus(instance_name, False)
831

    
832
  def _UnlockedGetInstanceList(self):
833
    """Get the list of instances.
834

835
    This function is for internal use, when the config lock is already held.
836

837
    """
838
    return self._config_data.instances.keys()
839

    
840
  @locking.ssynchronized(_config_lock, shared=1)
841
  def GetInstanceList(self):
842
    """Get the list of instances.
843

844
    @return: array of instances, ex. ['instance2.example.com',
845
        'instance1.example.com']
846

847
    """
848
    return self._UnlockedGetInstanceList()
849

    
850
  @locking.ssynchronized(_config_lock, shared=1)
851
  def ExpandInstanceName(self, short_name):
852
    """Attempt to expand an incomplete instance name.
853

854
    """
855
    return utils.MatchNameComponent(short_name,
856
                                    self._config_data.instances.keys(),
857
                                    case_sensitive=False)
858

    
859
  def _UnlockedGetInstanceInfo(self, instance_name):
860
    """Returns information about an instance.
861

862
    This function is for internal use, when the config lock is already held.
863

864
    """
865
    if instance_name not in self._config_data.instances:
866
      return None
867

    
868
    return self._config_data.instances[instance_name]
869

    
870
  @locking.ssynchronized(_config_lock, shared=1)
871
  def GetInstanceInfo(self, instance_name):
872
    """Returns information about an instance.
873

874
    It takes the information from the configuration file. Other information of
875
    an instance are taken from the live systems.
876

877
    @param instance_name: name of the instance, e.g.
878
        I{instance1.example.com}
879

880
    @rtype: L{objects.Instance}
881
    @return: the instance object
882

883
    """
884
    return self._UnlockedGetInstanceInfo(instance_name)
885

    
886
  @locking.ssynchronized(_config_lock, shared=1)
887
  def GetAllInstancesInfo(self):
888
    """Get the configuration of all instances.
889

890
    @rtype: dict
891
    @return: dict of (instance, instance_info), where instance_info is what
892
              would GetInstanceInfo return for the node
893

894
    """
895
    my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
896
                    for instance in self._UnlockedGetInstanceList()])
897
    return my_dict
898

    
899
  @locking.ssynchronized(_config_lock)
900
  def AddNode(self, node):
901
    """Add a node to the configuration.
902

903
    @type node: L{objects.Node}
904
    @param node: a Node instance
905

906
    """
907
    logging.info("Adding node %s to configuration", node.name)
908

    
909
    self._EnsureUUID(node)
910

    
911
    node.serial_no = 1
912
    node.ctime = node.mtime = time.time()
913
    self._config_data.nodes[node.name] = node
914
    self._config_data.cluster.serial_no += 1
915
    self._WriteConfig()
916

    
917
  @locking.ssynchronized(_config_lock)
918
  def RemoveNode(self, node_name):
919
    """Remove a node from the configuration.
920

921
    """
922
    logging.info("Removing node %s from configuration", node_name)
923

    
924
    if node_name not in self._config_data.nodes:
925
      raise errors.ConfigurationError("Unknown node '%s'" % node_name)
926

    
927
    del self._config_data.nodes[node_name]
928
    self._config_data.cluster.serial_no += 1
929
    self._WriteConfig()
930

    
931
  @locking.ssynchronized(_config_lock, shared=1)
932
  def ExpandNodeName(self, short_name):
933
    """Attempt to expand an incomplete instance name.
934

935
    """
936
    return utils.MatchNameComponent(short_name,
937
                                    self._config_data.nodes.keys(),
938
                                    case_sensitive=False)
939

    
940
  def _UnlockedGetNodeInfo(self, node_name):
941
    """Get the configuration of a node, as stored in the config.
942

943
    This function is for internal use, when the config lock is already
944
    held.
945

946
    @param node_name: the node name, e.g. I{node1.example.com}
947

948
    @rtype: L{objects.Node}
949
    @return: the node object
950

951
    """
952
    if node_name not in self._config_data.nodes:
953
      return None
954

    
955
    return self._config_data.nodes[node_name]
956

    
957

    
958
  @locking.ssynchronized(_config_lock, shared=1)
959
  def GetNodeInfo(self, node_name):
960
    """Get the configuration of a node, as stored in the config.
961

962
    This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
963

964
    @param node_name: the node name, e.g. I{node1.example.com}
965

966
    @rtype: L{objects.Node}
967
    @return: the node object
968

969
    """
970
    return self._UnlockedGetNodeInfo(node_name)
971

    
972
  def _UnlockedGetNodeList(self):
973
    """Return the list of nodes which are in the configuration.
974

975
    This function is for internal use, when the config lock is already
976
    held.
977

978
    @rtype: list
979

980
    """
981
    return self._config_data.nodes.keys()
982

    
983

    
984
  @locking.ssynchronized(_config_lock, shared=1)
985
  def GetNodeList(self):
986
    """Return the list of nodes which are in the configuration.
987

988
    """
989
    return self._UnlockedGetNodeList()
990

    
991
  @locking.ssynchronized(_config_lock, shared=1)
992
  def GetOnlineNodeList(self):
993
    """Return the list of nodes which are online.
994

995
    """
996
    all_nodes = [self._UnlockedGetNodeInfo(node)
997
                 for node in self._UnlockedGetNodeList()]
998
    return [node.name for node in all_nodes if not node.offline]
999

    
1000
  @locking.ssynchronized(_config_lock, shared=1)
1001
  def GetAllNodesInfo(self):
1002
    """Get the configuration of all nodes.
1003

1004
    @rtype: dict
1005
    @return: dict of (node, node_info), where node_info is what
1006
              would GetNodeInfo return for the node
1007

1008
    """
1009
    my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
1010
                    for node in self._UnlockedGetNodeList()])
1011
    return my_dict
1012

    
1013
  def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1014
    """Get the number of current and maximum desired and possible candidates.
1015

1016
    @type exceptions: list
1017
    @param exceptions: if passed, list of nodes that should be ignored
1018
    @rtype: tuple
1019
    @return: tuple of (current, desired and possible, possible)
1020

1021
    """
1022
    mc_now = mc_should = mc_max = 0
1023
    for node in self._config_data.nodes.values():
1024
      if exceptions and node.name in exceptions:
1025
        continue
1026
      if not (node.offline or node.drained):
1027
        mc_max += 1
1028
      if node.master_candidate:
1029
        mc_now += 1
1030
    mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1031
    return (mc_now, mc_should, mc_max)
1032

    
1033
  @locking.ssynchronized(_config_lock, shared=1)
1034
  def GetMasterCandidateStats(self, exceptions=None):
1035
    """Get the number of current and maximum possible candidates.
1036

1037
    This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1038

1039
    @type exceptions: list
1040
    @param exceptions: if passed, list of nodes that should be ignored
1041
    @rtype: tuple
1042
    @return: tuple of (current, max)
1043

1044
    """
1045
    return self._UnlockedGetMasterCandidateStats(exceptions)
1046

    
1047
  @locking.ssynchronized(_config_lock)
1048
  def MaintainCandidatePool(self, exceptions):
1049
    """Try to grow the candidate pool to the desired size.
1050

1051
    @type exceptions: list
1052
    @param exceptions: if passed, list of nodes that should be ignored
1053
    @rtype: list
1054
    @return: list with the adjusted nodes (L{objects.Node} instances)
1055

1056
    """
1057
    mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1058
    mod_list = []
1059
    if mc_now < mc_max:
1060
      node_list = self._config_data.nodes.keys()
1061
      random.shuffle(node_list)
1062
      for name in node_list:
1063
        if mc_now >= mc_max:
1064
          break
1065
        node = self._config_data.nodes[name]
1066
        if (node.master_candidate or node.offline or node.drained or
1067
            node.name in exceptions):
1068
          continue
1069
        mod_list.append(node)
1070
        node.master_candidate = True
1071
        node.serial_no += 1
1072
        mc_now += 1
1073
      if mc_now != mc_max:
1074
        # this should not happen
1075
        logging.warning("Warning: MaintainCandidatePool didn't manage to"
1076
                        " fill the candidate pool (%d/%d)", mc_now, mc_max)
1077
      if mod_list:
1078
        self._config_data.cluster.serial_no += 1
1079
        self._WriteConfig()
1080

    
1081
    return mod_list
1082

    
1083
  def _BumpSerialNo(self):
1084
    """Bump up the serial number of the config.
1085

1086
    """
1087
    self._config_data.serial_no += 1
1088
    self._config_data.mtime = time.time()
1089

    
1090
  def _AllUUIDObjects(self):
1091
    """Returns all objects with uuid attributes.
1092

1093
    """
1094
    return (self._config_data.instances.values() +
1095
            self._config_data.nodes.values() +
1096
            [self._config_data.cluster])
1097

    
1098
  def _OpenConfig(self):
1099
    """Read the config data from disk.
1100

1101
    """
1102
    raw_data = utils.ReadFile(self._cfg_file)
1103

    
1104
    try:
1105
      data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1106
    except Exception, err:
1107
      raise errors.ConfigurationError(err)
1108

    
1109
    # Make sure the configuration has the right version
1110
    _ValidateConfig(data)
1111

    
1112
    if (not hasattr(data, 'cluster') or
1113
        not hasattr(data.cluster, 'rsahostkeypub')):
1114
      raise errors.ConfigurationError("Incomplete configuration"
1115
                                      " (missing cluster.rsahostkeypub)")
1116

    
1117
    # Upgrade configuration if needed
1118
    data.UpgradeConfig()
1119

    
1120
    self._config_data = data
1121
    # reset the last serial as -1 so that the next write will cause
1122
    # ssconf update
1123
    self._last_cluster_serial = -1
1124

    
1125
    # And finally run our (custom) config upgrade sequence
1126
    self._UpgradeConfig()
1127

    
1128
  def _UpgradeConfig(self):
1129
    """Run upgrade steps that cannot be done purely in the objects.
1130

1131
    This is because some data elements need uniqueness across the
1132
    whole configuration, etc.
1133

1134
    @warning: this function will call L{_WriteConfig()}, so it needs
1135
        to either be called with the lock held or from a safe place
1136
        (the constructor)
1137

1138
    """
1139
    modified = False
1140
    for item in self._AllUUIDObjects():
1141
      if item.uuid is None:
1142
        item.uuid = self._GenerateUniqueID()
1143
        modified = True
1144
    if modified:
1145
      self._WriteConfig()
1146

    
1147
  def _DistributeConfig(self, feedback_fn):
1148
    """Distribute the configuration to the other nodes.
1149

1150
    Currently, this only copies the configuration file. In the future,
1151
    it could be used to encapsulate the 2/3-phase update mechanism.
1152

1153
    """
1154
    if self._offline:
1155
      return True
1156

    
1157
    bad = False
1158

    
1159
    node_list = []
1160
    addr_list = []
1161
    myhostname = self._my_hostname
1162
    # we can skip checking whether _UnlockedGetNodeInfo returns None
1163
    # since the node list comes from _UnlocketGetNodeList, and we are
1164
    # called with the lock held, so no modifications should take place
1165
    # in between
1166
    for node_name in self._UnlockedGetNodeList():
1167
      if node_name == myhostname:
1168
        continue
1169
      node_info = self._UnlockedGetNodeInfo(node_name)
1170
      if not node_info.master_candidate:
1171
        continue
1172
      node_list.append(node_info.name)
1173
      addr_list.append(node_info.primary_ip)
1174

    
1175
    result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1176
                                            address_list=addr_list)
1177
    for to_node, to_result in result.items():
1178
      msg = to_result.fail_msg
1179
      if msg:
1180
        msg = ("Copy of file %s to node %s failed: %s" %
1181
               (self._cfg_file, to_node, msg))
1182
        logging.error(msg)
1183

    
1184
        if feedback_fn:
1185
          feedback_fn(msg)
1186

    
1187
        bad = True
1188

    
1189
    return not bad
1190

    
1191
  def _WriteConfig(self, destination=None, feedback_fn=None):
1192
    """Write the configuration data to persistent storage.
1193

1194
    """
1195
    assert feedback_fn is None or callable(feedback_fn)
1196

    
1197
    # First, cleanup the _temporary_ids set, if an ID is now in the
1198
    # other objects it should be discarded to prevent unbounded growth
1199
    # of that structure
1200
    self._CleanupTemporaryIDs()
1201

    
1202
    # Warn on config errors, but don't abort the save - the
1203
    # configuration has already been modified, and we can't revert;
1204
    # the best we can do is to warn the user and save as is, leaving
1205
    # recovery to the user
1206
    config_errors = self._UnlockedVerifyConfig()
1207
    if config_errors:
1208
      errmsg = ("Configuration data is not consistent: %s" %
1209
                (", ".join(config_errors)))
1210
      logging.critical(errmsg)
1211
      if feedback_fn:
1212
        feedback_fn(errmsg)
1213

    
1214
    if destination is None:
1215
      destination = self._cfg_file
1216
    self._BumpSerialNo()
1217
    txt = serializer.Dump(self._config_data.ToDict())
1218

    
1219
    utils.WriteFile(destination, data=txt)
1220

    
1221
    self.write_count += 1
1222

    
1223
    # and redistribute the config file to master candidates
1224
    self._DistributeConfig(feedback_fn)
1225

    
1226
    # Write ssconf files on all nodes (including locally)
1227
    if self._last_cluster_serial < self._config_data.cluster.serial_no:
1228
      if not self._offline:
1229
        result = rpc.RpcRunner.call_write_ssconf_files(
1230
          self._UnlockedGetNodeList(),
1231
          self._UnlockedGetSsconfValues())
1232

    
1233
        for nname, nresu in result.items():
1234
          msg = nresu.fail_msg
1235
          if msg:
1236
            errmsg = ("Error while uploading ssconf files to"
1237
                      " node %s: %s" % (nname, msg))
1238
            logging.warning(errmsg)
1239

    
1240
            if feedback_fn:
1241
              feedback_fn(errmsg)
1242

    
1243
      self._last_cluster_serial = self._config_data.cluster.serial_no
1244

    
1245
  def _UnlockedGetSsconfValues(self):
1246
    """Return the values needed by ssconf.
1247

1248
    @rtype: dict
1249
    @return: a dictionary with keys the ssconf names and values their
1250
        associated value
1251

1252
    """
1253
    fn = "\n".join
1254
    instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1255
    node_names = utils.NiceSort(self._UnlockedGetNodeList())
1256
    node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1257
    node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1258
                    for ninfo in node_info]
1259
    node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1260
                    for ninfo in node_info]
1261

    
1262
    instance_data = fn(instance_names)
1263
    off_data = fn(node.name for node in node_info if node.offline)
1264
    on_data = fn(node.name for node in node_info if not node.offline)
1265
    mc_data = fn(node.name for node in node_info if node.master_candidate)
1266
    mc_ips_data = fn(node.primary_ip for node in node_info
1267
                     if node.master_candidate)
1268
    node_data = fn(node_names)
1269
    node_pri_ips_data = fn(node_pri_ips)
1270
    node_snd_ips_data = fn(node_snd_ips)
1271

    
1272
    cluster = self._config_data.cluster
1273
    cluster_tags = fn(cluster.GetTags())
1274
    return {
1275
      constants.SS_CLUSTER_NAME: cluster.cluster_name,
1276
      constants.SS_CLUSTER_TAGS: cluster_tags,
1277
      constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1278
      constants.SS_MASTER_CANDIDATES: mc_data,
1279
      constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1280
      constants.SS_MASTER_IP: cluster.master_ip,
1281
      constants.SS_MASTER_NETDEV: cluster.master_netdev,
1282
      constants.SS_MASTER_NODE: cluster.master_node,
1283
      constants.SS_NODE_LIST: node_data,
1284
      constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1285
      constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1286
      constants.SS_OFFLINE_NODES: off_data,
1287
      constants.SS_ONLINE_NODES: on_data,
1288
      constants.SS_INSTANCE_LIST: instance_data,
1289
      constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1290
      }
1291

    
1292
  @locking.ssynchronized(_config_lock, shared=1)
1293
  def GetVGName(self):
1294
    """Return the volume group name.
1295

1296
    """
1297
    return self._config_data.cluster.volume_group_name
1298

    
1299
  @locking.ssynchronized(_config_lock)
1300
  def SetVGName(self, vg_name):
1301
    """Set the volume group name.
1302

1303
    """
1304
    self._config_data.cluster.volume_group_name = vg_name
1305
    self._config_data.cluster.serial_no += 1
1306
    self._WriteConfig()
1307

    
1308
  @locking.ssynchronized(_config_lock, shared=1)
1309
  def GetMACPrefix(self):
1310
    """Return the mac prefix.
1311

1312
    """
1313
    return self._config_data.cluster.mac_prefix
1314

    
1315
  @locking.ssynchronized(_config_lock, shared=1)
1316
  def GetClusterInfo(self):
1317
    """Returns information about the cluster
1318

1319
    @rtype: L{objects.Cluster}
1320
    @return: the cluster object
1321

1322
    """
1323
    return self._config_data.cluster
1324

    
1325
  @locking.ssynchronized(_config_lock)
1326
  def Update(self, target, feedback_fn):
1327
    """Notify function to be called after updates.
1328

1329
    This function must be called when an object (as returned by
1330
    GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1331
    caller wants the modifications saved to the backing store. Note
1332
    that all modified objects will be saved, but the target argument
1333
    is the one the caller wants to ensure that it's saved.
1334

1335
    @param target: an instance of either L{objects.Cluster},
1336
        L{objects.Node} or L{objects.Instance} which is existing in
1337
        the cluster
1338
    @param feedback_fn: Callable feedback function
1339

1340
    """
1341
    if self._config_data is None:
1342
      raise errors.ProgrammerError("Configuration file not read,"
1343
                                   " cannot save.")
1344
    update_serial = False
1345
    if isinstance(target, objects.Cluster):
1346
      test = target == self._config_data.cluster
1347
    elif isinstance(target, objects.Node):
1348
      test = target in self._config_data.nodes.values()
1349
      update_serial = True
1350
    elif isinstance(target, objects.Instance):
1351
      test = target in self._config_data.instances.values()
1352
    else:
1353
      raise errors.ProgrammerError("Invalid object type (%s) passed to"
1354
                                   " ConfigWriter.Update" % type(target))
1355
    if not test:
1356
      raise errors.ConfigurationError("Configuration updated since object"
1357
                                      " has been read or unknown object")
1358
    target.serial_no += 1
1359
    target.mtime = now = time.time()
1360

    
1361
    if update_serial:
1362
      # for node updates, we need to increase the cluster serial too
1363
      self._config_data.cluster.serial_no += 1
1364
      self._config_data.cluster.mtime = now
1365

    
1366
    if isinstance(target, objects.Instance):
1367
      self._UnlockedReleaseDRBDMinors(target.name)
1368
      for nic in target.nics:
1369
        self._temporary_macs.discard(nic.mac)
1370

    
1371
    self._WriteConfig(feedback_fn=feedback_fn)