import os
import sys
import logging
-import pyinotify
import time
+try:
+ from pyinotify import pyinotify
+except ImportError:
+ import pyinotify
+
from optparse import OptionParser
from ganeti import asyncnotifier
# and now put it under $prefix on the target node(s)
for host; do
echo Uploading code to ${host}...
- rsync -v -rlDc --exclude="*.py[oc]" --exclude="*.pdf" --exclude="*.html" \
+ rsync -v -rlDc \
+ -e "ssh -oBatchMode=yes" \
+ --exclude="*.py[oc]" --exclude="*.pdf" --exclude="*.html" \
"$TXD/" \
root@${host}:/ &
done
if test -z "${NO_RESTART}"; then
for host; do
echo Restarting ganeti-noded on ${host}...
- ssh root@${host} /etc/init.d/ganeti restart &
+ ssh -oBatchMode=yes root@${host} /etc/init.d/ganeti restart &
done
wait
fi
"""Asynchronous pyinotify implementation"""
-import pyinotify
import asyncore
+try:
+ from pyinotify import pyinotify
+except ImportError:
+ import pyinotify
+
class AsyncNotifier(asyncore.file_dispatcher):
"""An asyncore dispatcher for inotify events.
tmpr.append("The procfs filesystem doesn't seem to be mounted"
" under /proc, missing required directory /proc/sys and"
" the file /proc/sysrq-trigger")
+
+ if constants.NV_TIME in what:
+ result[constants.NV_TIME] = utils.SplitTime(time.time())
+
return result
from ganeti import objects
+# Size of reads in _CanReadDevice
+_DEVICE_READ_SIZE = 128 * 1024
+
+
def _IgnoreError(fn, *args, **kwargs):
"""Executes the given function, ignoring BlockDeviceErrors.
raise errors.BlockDeviceError(msg)
+def _CanReadDevice(path):
+ """Check if we can read from the given device.
+
+ This tries to read the first 128k of the device.
+
+ """
+ try:
+ utils.ReadFile(path, size=_DEVICE_READ_SIZE)
+ return True
+ except EnvironmentError, err:
+ logging.warning("Can't read from device %s", path, exc_info=True)
+ return False
+
+
class BlockDev(object):
"""Block device abstract class.
def __init__(self, unique_id, children, size):
if children and children.count(None) > 0:
children = []
+ if len(children) not in (0, 2):
+ raise ValueError("Invalid configuration data %s" % str(children))
+ if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 6:
+ raise ValueError("Invalid configuration data %s" % str(unique_id))
+ (self._lhost, self._lport,
+ self._rhost, self._rport,
+ self._aminor, self._secret) = unique_id
+ if children:
+ if not _CanReadDevice(children[1].dev_path):
+ logging.info("drbd%s: Ignoring unreadable meta device", self._aminor)
+ children = []
super(DRBD8, self).__init__(unique_id, children, size)
self.major = self._DRBD_MAJOR
version = self._GetVersion()
" usage: kernel is %s.%s, ganeti wants 8.x",
version['k_major'], version['k_minor'])
- if len(children) not in (0, 2):
- raise ValueError("Invalid configuration data %s" % str(children))
- if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 6:
- raise ValueError("Invalid configuration data %s" % str(unique_id))
- (self._lhost, self._lport,
- self._rhost, self._rport,
- self._aminor, self._secret) = unique_id
if (self._lhost is not None and self._lhost == self._rhost and
self._lport == self._rport):
raise ValueError("Invalid configuration data, same local/remote %s" %
ENODESSH = (TNODE, "ENODESSH")
ENODEVERSION = (TNODE, "ENODEVERSION")
ENODESETUP = (TNODE, "ENODESETUP")
+ ENODETIME = (TNODE, "ENODETIME")
ETYPE_FIELD = "code"
ETYPE_ERROR = "ERROR"
constants.NV_VERSION: None,
constants.NV_HVINFO: self.cfg.GetHypervisorType(),
constants.NV_NODESETUP: None,
+ constants.NV_TIME: None,
}
+
if vg_name is not None:
node_verify_param[constants.NV_VGLIST] = None
node_verify_param[constants.NV_LVLIST] = vg_name
node_verify_param[constants.NV_PVLIST] = [vg_name]
node_verify_param[constants.NV_DRBDLIST] = None
+
+ # Due to the way our RPC system works, exact response times cannot be
+ # guaranteed (e.g. a broken node could run into a timeout). By keeping the
+ # time before and after executing the request, we can at least have a time
+ # window.
+ nvinfo_starttime = time.time()
all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
self.cfg.GetClusterName())
+ nvinfo_endtime = time.time()
cluster = self.cfg.GetClusterInfo()
master_node = self.cfg.GetMasterNode()
else:
instance = instanceinfo[instance]
node_drbd[minor] = (instance.name, instance.admin_up)
+
self._VerifyNode(node_i, file_names, local_checksums,
nresult, master_files, node_drbd, vg_name)
if test:
continue
+ # Node time
+ ntime = nresult.get(constants.NV_TIME, None)
+ try:
+ ntime_merged = utils.MergeTime(ntime)
+ except (ValueError, TypeError):
+ _ErrorIf(test, self.ENODETIME, node, "Node returned invalid time")
+
+ if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
+ ntime_diff = abs(nvinfo_starttime - ntime_merged)
+ elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
+ ntime_diff = abs(ntime_merged - nvinfo_endtime)
+ else:
+ ntime_diff = None
+
+ _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
+ "Node time diverges by at least %0.1fs from master node time",
+ ntime_diff)
+
+ if ntime_diff is not None:
+ continue
+
try:
node_info[node] = {
"mfree": int(nodeinfo['memory_free']),
self.cfg.SetDiskID(disk, node)
result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
result.Raise("Grow request failed to node %s" % node)
+
+ # TODO: Rewrite code to work properly
+ # DRBD goes into sync mode for a short amount of time after executing the
+ # "resize" command. DRBD 8.x below version 8.0.13 contains a bug whereby
+ # calling "resize" in sync mode fails. Sleeping for a short amount of
+ # time is a work-around.
+ time.sleep(5)
+
disk.RecordGrow(self.op.amount)
self.cfg.Update(instance, feedback_fn)
if self.op.wait_for_sync:
It accepts no arguments, and returns the current cluster master.
"""
+ def _GetMasterNode(self):
+ return self.reader.GetMasterNode()
+
def Exec(self, query):
"""ClusterMasterQuery main execution
"""
- if query is None:
+ if isinstance(query, dict):
+ if constants.CONFD_REQQ_FIELDS in query:
+ status = constants.CONFD_REPL_STATUS_OK
+ req_fields = query[constants.CONFD_REQQ_FIELDS]
+ if not isinstance(req_fields, (list, tuple)):
+ logging.debug("FIELDS request should be a list")
+ return QUERY_ARGUMENT_ERROR
+
+ answer = []
+ for field in req_fields:
+ if field == constants.CONFD_REQFIELD_NAME:
+ answer.append(self._GetMasterNode())
+ elif field == constants.CONFD_REQFIELD_IP:
+ answer.append(self.reader.GetMasterIP())
+ elif field == constants.CONFD_REQFIELD_MNODE_PIP:
+ answer.append(self.reader.GetNodePrimaryIp(self._GetMasterNode()))
+ else:
+ logging.debug("missing FIELDS in query dict")
+ return QUERY_ARGUMENT_ERROR
+ elif not query:
status = constants.CONFD_REPL_STATUS_OK
answer = self.reader.GetMasterNode()
else:
- status = constants.CONFD_REPL_STATUS_ERROR
- answer = 'master query accepts no query argument'
+ logging.debug("Invalid master query argument: not dict or empty")
+ return QUERY_ARGUMENT_ERROR
return status, answer
LVM_STRIPECOUNT = _autoconf.LVM_STRIPECOUNT
# default maximum instance wait time, in seconds.
DEFAULT_SHUTDOWN_TIMEOUT = 120
-
+NODE_MAX_CLOCK_SKEW = 150
# RPC constants
(RPC_ENCODING_NONE,
NV_PVLIST = "pvlist"
NV_DRBDLIST = "drbd-list"
NV_NODESETUP = "nodesetup"
+NV_TIME = "time"
# Allocator framework constants
IALLOCATOR_VERSION = 2
CONFD_REQQ_LINK = "0"
CONFD_REQQ_IP = "1"
CONFD_REQQ_IPLIST = "2"
+CONFD_REQQ_FIELDS = "3"
+
+CONFD_REQFIELD_NAME = "0"
+CONFD_REQFIELD_IP = "1"
+CONFD_REQFIELD_MNODE_PIP = "2"
CONFD_REQS = frozenset([
CONFD_REQ_PING,
# Each request is "salted" by the current timestamp.
# This constants decides how many seconds of skew to accept.
# TODO: make this a default and allow the value to be more configurable
-CONFD_MAX_CLOCK_SKEW = 300
+CONFD_MAX_CLOCK_SKEW = 2 * NODE_MAX_CLOCK_SKEW
# When we haven't reloaded the config for more than this amount of seconds, we
# force a test to see if inotify is betraying us.
script.write(" # Connect the interface to the bridge\n")
script.write(" /usr/sbin/brctl addif $BRIDGE $INTERFACE\n")
elif nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_ROUTED:
+ if not nic.ip:
+ raise errors.HypervisorError("nic/%d is routed, but has no ip." % seq)
script.write(" # Route traffic targeted at the IP to the interface\n")
if nic.nicparams[constants.NIC_LINK]:
script.write(" while /sbin/ip rule del dev $INTERFACE; do :; done\n")
raise errors.GenericError("%s is not a directory" % dir_name)
-def ReadFile(file_name, size=None):
+def ReadFile(file_name, size=-1):
"""Reads a file.
- @type size: None or int
- @param size: Read at most size bytes
+ @type size: int
+ @param size: Read at most size bytes (if negative, entire file)
@rtype: str
@return: the (possibly partial) content of the file
"""
f = open(file_name, "r")
try:
- if size is None:
- return f.read()
- else:
- return f.read(size)
+ return f.read(size)
finally:
f.close()
self.failUnless(constants.LDS_OKAY < constants.LDS_UNKNOWN)
self.failUnless(constants.LDS_UNKNOWN < constants.LDS_FAULTY)
+ def testClockSkew(self):
+ self.failUnless(constants.NODE_MAX_CLOCK_SKEW <
+ (0.8 * constants.CONFD_MAX_CLOCK_SKEW))
+
class TestParameterNames(unittest.TestCase):
"""HV/BE parameter tests"""
def testFourCc(self):
self.failUnlessEqual(len(constants.CONFD_MAGIC_FOURCC), 4,
- "Invalid fourcc len, should be 4")
+ "Invalid fourcc len, should be 4")
def _IsUniqueSequence(self, sequence):
seen = set()