Revision a4eae71f

b/lib/bootstrap.py
289 289
  InitConfig(constants.CONFIG_VERSION, cluster_config, master_node_config)
290 290
  cfg = config.ConfigWriter()
291 291
  ssh.WriteKnownHostsFile(cfg, constants.SSH_KNOWN_HOSTS_FILE)
292
  cfg.Update(cfg.GetClusterInfo())
292
  cfg.Update(cfg.GetClusterInfo(), logging.error)
293 293

  
294 294
  # start the master ip
295 295
  # TODO: Review rpc call from bootstrap
......
482 482
  cluster_info.master_node = new_master
483 483
  # this will also regenerate the ssconf files, since we updated the
484 484
  # cluster info
485
  cfg.Update(cluster_info)
485
  cfg.Update(cluster_info, logging.error)
486 486

  
487 487
  result = rpc.RpcRunner.call_node_start_master(new_master, True, no_voting)
488 488
  msg = result.fail_msg
b/lib/cmdlib.py
1730 1730
                       " correcting: recorded %d, actual %d", idx,
1731 1731
                       instance.name, disk.size, size)
1732 1732
          disk.size = size
1733
          self.cfg.Update(instance)
1733
          self.cfg.Update(instance, feedback_fn)
1734 1734
          changed.append((instance.name, idx, size))
1735 1735
        if self._EnsureChildSizes(disk):
1736
          self.cfg.Update(instance)
1736
          self.cfg.Update(instance, feedback_fn)
1737 1737
          changed.append((instance.name, idx, disk.size))
1738 1738
    return changed
1739 1739

  
......
1794 1794
      cluster = self.cfg.GetClusterInfo()
1795 1795
      cluster.cluster_name = clustername
1796 1796
      cluster.master_ip = ip
1797
      self.cfg.Update(cluster)
1797
      self.cfg.Update(cluster, feedback_fn)
1798 1798

  
1799 1799
      # update the known hosts file
1800 1800
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
......
1988 1988
      # we need to update the pool size here, otherwise the save will fail
1989 1989
      _AdjustCandidatePool(self, [])
1990 1990

  
1991
    self.cfg.Update(self.cluster)
1991
    self.cfg.Update(self.cluster, feedback_fn)
1992 1992

  
1993 1993

  
1994 1994
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
......
2009 2009
    dist_nodes.extend(additional_nodes)
2010 2010
  if myself.name in dist_nodes:
2011 2011
    dist_nodes.remove(myself.name)
2012

  
2012 2013
  # 2. Gather files to distribute
2013 2014
  dist_files = set([constants.ETC_HOSTS,
2014 2015
                    constants.SSH_KNOWN_HOSTS_FILE,
......
2058 2059
    """Redistribute the configuration.
2059 2060

  
2060 2061
    """
2061
    self.cfg.Update(self.cfg.GetClusterInfo())
2062
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
2062 2063
    _RedistributeAncillaryFiles(self)
2063 2064

  
2064 2065

  
......
2958 2959
      _RedistributeAncillaryFiles(self)
2959 2960
      self.context.ReaddNode(new_node)
2960 2961
      # make sure we redistribute the config
2961
      self.cfg.Update(new_node)
2962
      self.cfg.Update(new_node, feedback_fn)
2962 2963
      # and make sure the new node will not have old files around
2963 2964
      if not new_node.master_candidate:
2964 2965
        result = self.rpc.call_node_demote_from_mc(new_node.name)
......
3113 3114
          result.append(("offline", "clear offline status due to drain"))
3114 3115

  
3115 3116
    # this will trigger configuration file update, if needed
3116
    self.cfg.Update(node)
3117
    self.cfg.Update(node, feedback_fn)
3117 3118
    # this will trigger job queue propagation or cleanup
3118 3119
    if changed_mc:
3119 3120
      self.context.ReaddNode(node)
......
3828 3829
    if self.op.os_type is not None:
3829 3830
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3830 3831
      inst.os = self.op.os_type
3831
      self.cfg.Update(inst)
3832
      self.cfg.Update(inst, feedback_fn)
3832 3833

  
3833 3834
    _StartInstanceDisks(self, inst, None)
3834 3835
    try:
......
4501 4502

  
4502 4503
    instance.primary_node = target_node
4503 4504
    # distribute new instance config to the other nodes
4504
    self.cfg.Update(instance)
4505
    self.cfg.Update(instance, feedback_fn)
4505 4506

  
4506 4507
    # Only start the instance if it's marked as up
4507 4508
    if instance.admin_up:
......
4725 4726
                                 (",".join(errs),))
4726 4727

  
4727 4728
    instance.primary_node = target_node
4728
    self.cfg.Update(instance)
4729
    self.cfg.Update(instance, feedback_fn)
4729 4730

  
4730 4731
    self.LogInfo("Removing the disks on the original node")
4731 4732
    _RemoveDisks(self, instance, target_node=source_node)
......
4963 4964
      self.feedback_fn("* instance running on secondary node (%s),"
4964 4965
                       " updating config" % target_node)
4965 4966
      instance.primary_node = target_node
4966
      self.cfg.Update(instance)
4967
      self.cfg.Update(instance, self.feedback_fn)
4967 4968
      demoted_node = source_node
4968 4969
    else:
4969 4970
      self.feedback_fn("* instance confirmed to be running on its"
......
5090 5091

  
5091 5092
    instance.primary_node = target_node
5092 5093
    # distribute new instance config to the other nodes
5093
    self.cfg.Update(instance)
5094
    self.cfg.Update(instance, self.feedback_fn)
5094 5095

  
5095 5096
    result = self.rpc.call_finalize_migration(target_node,
5096 5097
                                              instance,
......
6040 6041

  
6041 6042
    if self.op.start:
6042 6043
      iobj.admin_up = True
6043
      self.cfg.Update(iobj)
6044
      self.cfg.Update(iobj, feedback_fn)
6044 6045
      logging.info("Starting instance %s on node %s", instance, pnode_name)
6045 6046
      feedback_fn("* starting instance...")
6046 6047
      result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
......
6490 6491
    try:
6491 6492
      # Should we replace the secondary node?
6492 6493
      if self.new_node is not None:
6493
        return self._ExecDrbd8Secondary()
6494
        fn = self._ExecDrbd8Secondary
6494 6495
      else:
6495
        return self._ExecDrbd8DiskOnly()
6496
        fn = self._ExecDrbd8DiskOnly
6497

  
6498
      return fn(feedback_fn)
6496 6499

  
6497 6500
    finally:
6498 6501
      # Deactivate the instance disks if we're replacing them on a down instance
......
6608 6611
          self.lu.LogWarning("Can't remove old LV: %s" % msg,
6609 6612
                             hint="remove unused LVs manually")
6610 6613

  
6611
  def _ExecDrbd8DiskOnly(self):
6614
  def _ExecDrbd8DiskOnly(self, feedback_fn):
6612 6615
    """Replace a disk on the primary or secondary for DRBD 8.
6613 6616

  
6614 6617
    The algorithm for replace is quite complicated:
......
6716 6719

  
6717 6720
      dev.children = new_lvs
6718 6721

  
6719
      self.cfg.Update(self.instance)
6722
      self.cfg.Update(self.instance, feedback_fn)
6720 6723

  
6721 6724
    # Wait for sync
6722 6725
    # This can fail as the old devices are degraded and _WaitForSync
......
6731 6734
    self.lu.LogStep(6, steps_total, "Removing old storage")
6732 6735
    self._RemoveOldStorage(self.target_node, iv_names)
6733 6736

  
6734
  def _ExecDrbd8Secondary(self):
6737
  def _ExecDrbd8Secondary(self, feedback_fn):
6735 6738
    """Replace the secondary node for DRBD 8.
6736 6739

  
6737 6740
    The algorithm for replace is quite complicated:
......
6844 6847
      dev.logical_id = new_logical_id
6845 6848
      self.cfg.SetDiskID(dev, self.instance.primary_node)
6846 6849

  
6847
    self.cfg.Update(self.instance)
6850
    self.cfg.Update(self.instance, feedback_fn)
6848 6851

  
6849 6852
    # and now perform the drbd attach
6850 6853
    self.lu.LogInfo("Attaching primary drbds to new secondary"
......
7015 7018
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
7016 7019
      result.Raise("Grow request failed to node %s" % node)
7017 7020
    disk.RecordGrow(self.op.amount)
7018
    self.cfg.Update(instance)
7021
    self.cfg.Update(instance, feedback_fn)
7019 7022
    if self.op.wait_for_sync:
7020 7023
      disk_abort = not _WaitForSync(self, instance)
7021 7024
      if disk_abort:
......
7685 7688
      for key, val in self.op.beparams.iteritems():
7686 7689
        result.append(("be/%s" % key, val))
7687 7690

  
7688
    self.cfg.Update(instance)
7691
    self.cfg.Update(instance, feedback_fn)
7689 7692

  
7690 7693
    return result
7691 7694

  
......
8088 8091
    except errors.TagError, err:
8089 8092
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
8090 8093
    try:
8091
      self.cfg.Update(self.target)
8094
      self.cfg.Update(self.target, feedback_fn)
8092 8095
    except errors.ConfigurationError:
8093 8096
      raise errors.OpRetryError("There has been a modification to the"
8094 8097
                                " config file and the operation has been"
......
8127 8130
    for tag in self.op.tags:
8128 8131
      self.target.RemoveTag(tag)
8129 8132
    try:
8130
      self.cfg.Update(self.target)
8133
      self.cfg.Update(self.target, feedback_fn)
8131 8134
    except errors.ConfigurationError:
8132 8135
      raise errors.OpRetryError("There has been a modification to the"
8133 8136
                                " config file and the operation has been"
b/lib/config.py
1145 1145
    if modified:
1146 1146
      self._WriteConfig()
1147 1147

  
1148
  def _DistributeConfig(self):
1148
  def _DistributeConfig(self, feedback_fn):
1149 1149
    """Distribute the configuration to the other nodes.
1150 1150

  
1151 1151
    Currently, this only copies the configuration file. In the future,
......
1154 1154
    """
1155 1155
    if self._offline:
1156 1156
      return True
1157

  
1157 1158
    bad = False
1158 1159

  
1159 1160
    node_list = []
......
1180 1181
        msg = ("Copy of file %s to node %s failed: %s" %
1181 1182
               (self._cfg_file, to_node, msg))
1182 1183
        logging.error(msg)
1184

  
1185
        if feedback_fn:
1186
          feedback_fn(msg)
1187

  
1183 1188
        bad = True
1189

  
1184 1190
    return not bad
1185 1191

  
1186
  def _WriteConfig(self, destination=None):
1192
  def _WriteConfig(self, destination=None, feedback_fn=None):
1187 1193
    """Write the configuration data to persistent storage.
1188 1194

  
1189 1195
    """
1196
    assert feedback_fn is None or callable(feedback_fn)
1197

  
1190 1198
    # first, cleanup the _temporary_ids set, if an ID is now in the
1191 1199
    # other objects it should be discarded to prevent unbounded growth
1192 1200
    # of that structure
......
1206 1214
    self.write_count += 1
1207 1215

  
1208 1216
    # and redistribute the config file to master candidates
1209
    self._DistributeConfig()
1217
    self._DistributeConfig(feedback_fn)
1210 1218

  
1211 1219
    # Write ssconf files on all nodes (including locally)
1212 1220
    if self._last_cluster_serial < self._config_data.cluster.serial_no:
......
1214 1222
        result = rpc.RpcRunner.call_write_ssconf_files(
1215 1223
          self._UnlockedGetNodeList(),
1216 1224
          self._UnlockedGetSsconfValues())
1225

  
1217 1226
        for nname, nresu in result.items():
1218 1227
          msg = nresu.fail_msg
1219 1228
          if msg:
1220
            logging.warning("Error while uploading ssconf files to"
1221
                            " node %s: %s", nname, msg)
1229
            errmsg = ("Error while uploading ssconf files to"
1230
                      " node %s: %s" % (nname, msg))
1231
            logging.warning(errmsg)
1232

  
1233
            if feedback_fn:
1234
              feedback_fn(errmsg)
1235

  
1222 1236
      self._last_cluster_serial = self._config_data.cluster.serial_no
1223 1237

  
1224 1238
  def _UnlockedGetSsconfValues(self):
......
1302 1316
    return self._config_data.cluster
1303 1317

  
1304 1318
  @locking.ssynchronized(_config_lock)
1305
  def Update(self, target):
1319
  def Update(self, target, feedback_fn):
1306 1320
    """Notify function to be called after updates.
1307 1321

  
1308 1322
    This function must be called when an object (as returned by
......
1314 1328
    @param target: an instance of either L{objects.Cluster},
1315 1329
        L{objects.Node} or L{objects.Instance} which is existing in
1316 1330
        the cluster
1331
    @param feedback_fn: Callable feedback function
1317 1332

  
1318 1333
    """
1319 1334
    if self._config_data is None:
......
1346 1361
      for nic in target.nics:
1347 1362
        self._temporary_macs.discard(nic.mac)
1348 1363

  
1349
    self._WriteConfig()
1364
    self._WriteConfig(feedback_fn=feedback_fn)
b/test/ganeti.config_unittest.py
108 108
    # construct a fake cluster object
109 109
    fake_cl = objects.Cluster()
110 110
    # fail if we didn't read the config
111
    self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_cl)
111
    self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_cl, None)
112 112

  
113 113
    cl = cfg.GetClusterInfo()
114 114
    # first pass, must not fail
115
    cfg.Update(cl)
115
    cfg.Update(cl, None)
116 116
    # second pass, also must not fail (after the config has been written)
117
    cfg.Update(cl)
117
    cfg.Update(cl, None)
118 118
    # but the fake_cl update should still fail
119
    self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_cl)
119
    self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_cl, None)
120 120

  
121 121
  def testUpdateNode(self):
122 122
    """Test updates on one node object"""
......
124 124
    # construct a fake node
125 125
    fake_node = objects.Node()
126 126
    # fail if we didn't read the config
127
    self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_node)
127
    self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_node,
128
                          None)
128 129

  
129 130
    node = cfg.GetNodeInfo(cfg.GetNodeList()[0])
130 131
    # first pass, must not fail
131
    cfg.Update(node)
132
    cfg.Update(node, None)
132 133
    # second pass, also must not fail (after the config has been written)
133
    cfg.Update(node)
134
    cfg.Update(node, None)
134 135
    # but the fake_node update should still fail
135
    self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_node)
136
    self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_node,
137
                          None)
136 138

  
137 139
  def testUpdateInstance(self):
138 140
    """Test updates on one instance object"""
......
141 143
    inst = self._create_instance()
142 144
    fake_instance = objects.Instance()
143 145
    # fail if we didn't read the config
144
    self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_instance)
146
    self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_instance,
147
                          None)
145 148

  
146 149
    cfg.AddInstance(inst)
147 150
    instance = cfg.GetInstanceInfo(cfg.GetInstanceList()[0])
148 151
    # first pass, must not fail
149
    cfg.Update(instance)
152
    cfg.Update(instance, None)
150 153
    # second pass, also must not fail (after the config has been written)
151
    cfg.Update(instance)
154
    cfg.Update(instance, None)
152 155
    # but the fake_instance update should still fail
153
    self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_instance)
156
    self.failUnlessRaises(errors.ConfigurationError, cfg.Update, fake_instance,
157
                          None)
154 158

  
155 159
  def testNICParameterSyntaxCheck(self):
156 160
    """Test the NIC's CheckParameterSyntax function"""

Also available in: Unified diff