Revision e58f87a9 lib/cmdlib.py

b/lib/cmdlib.py
36 36
import logging
37 37
import copy
38 38
import OpenSSL
39
import socket
40
import tempfile
41
import shutil
39 42

  
40 43
from ganeti import ssh
41 44
from ganeti import utils
......
9848 9851
        self._TestDelay()
9849 9852

  
9850 9853

  
9854
class LUTestJobqueue(NoHooksLU):
9855
  """Utility LU to test some aspects of the job queue.
9856

  
9857
  """
9858
  _OP_PARAMS = [
9859
    ("notify_waitlock", False, _TBool),
9860
    ("notify_exec", False, _TBool),
9861
    ("log_messages", _EmptyList, _TListOf(_TString)),
9862
    ("fail", False, _TBool),
9863
    ]
9864
  REQ_BGL = False
9865

  
9866
  # Must be lower than default timeout for WaitForJobChange to see whether it
9867
  # notices changed jobs
9868
  _CLIENT_CONNECT_TIMEOUT = 20.0
9869
  _CLIENT_CONFIRM_TIMEOUT = 60.0
9870

  
9871
  @classmethod
9872
  def _NotifyUsingSocket(cls, cb, errcls):
9873
    """Opens a Unix socket and waits for another program to connect.
9874

  
9875
    @type cb: callable
9876
    @param cb: Callback to send socket name to client
9877
    @type errcls: class
9878
    @param errcls: Exception class to use for errors
9879

  
9880
    """
9881
    # Using a temporary directory as there's no easy way to create temporary
9882
    # sockets without writing a custom loop around tempfile.mktemp and
9883
    # socket.bind
9884
    tmpdir = tempfile.mkdtemp()
9885
    try:
9886
      tmpsock = utils.PathJoin(tmpdir, "sock")
9887

  
9888
      logging.debug("Creating temporary socket at %s", tmpsock)
9889
      sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
9890
      try:
9891
        sock.bind(tmpsock)
9892
        sock.listen(1)
9893

  
9894
        # Send details to client
9895
        cb(tmpsock)
9896

  
9897
        # Wait for client to connect before continuing
9898
        sock.settimeout(cls._CLIENT_CONNECT_TIMEOUT)
9899
        try:
9900
          (conn, _) = sock.accept()
9901
        except socket.error, err:
9902
          raise errcls("Client didn't connect in time (%s)" % err)
9903
      finally:
9904
        sock.close()
9905
    finally:
9906
      # Remove as soon as client is connected
9907
      shutil.rmtree(tmpdir)
9908

  
9909
    # Wait for client to close
9910
    try:
9911
      try:
9912
        conn.settimeout(cls._CLIENT_CONFIRM_TIMEOUT)
9913
        conn.recv(1)
9914
      except socket.error, err:
9915
        raise errcls("Client failed to confirm notification (%s)" % err)
9916
    finally:
9917
      conn.close()
9918

  
9919
  def _SendNotification(self, test, arg, sockname):
9920
    """Sends a notification to the client.
9921

  
9922
    @type test: string
9923
    @param test: Test name
9924
    @param arg: Test argument (depends on test)
9925
    @type sockname: string
9926
    @param sockname: Socket path
9927

  
9928
    """
9929
    self.Log(constants.ELOG_JQUEUE_TEST, (sockname, test, arg))
9930

  
9931
  def _Notify(self, prereq, test, arg):
9932
    """Notifies the client of a test.
9933

  
9934
    @type prereq: bool
9935
    @param prereq: Whether this is a prereq-phase test
9936
    @type test: string
9937
    @param test: Test name
9938
    @param arg: Test argument (depends on test)
9939

  
9940
    """
9941
    if prereq:
9942
      errcls = errors.OpPrereqError
9943
    else:
9944
      errcls = errors.OpExecError
9945

  
9946
    return self._NotifyUsingSocket(compat.partial(self._SendNotification,
9947
                                                  test, arg),
9948
                                   errcls)
9949

  
9950
  def CheckArguments(self):
9951
    self.checkargs_calls = getattr(self, "checkargs_calls", 0) + 1
9952
    self.expandnames_calls = 0
9953

  
9954
  def ExpandNames(self):
9955
    checkargs_calls = getattr(self, "checkargs_calls", 0)
9956
    if checkargs_calls < 1:
9957
      raise errors.ProgrammerError("CheckArguments was not called")
9958

  
9959
    self.expandnames_calls += 1
9960

  
9961
    if self.op.notify_waitlock:
9962
      self._Notify(True, constants.JQT_EXPANDNAMES, None)
9963

  
9964
    self.LogInfo("Expanding names")
9965

  
9966
    # Get lock on master node (just to get a lock, not for a particular reason)
9967
    self.needed_locks = {
9968
      locking.LEVEL_NODE: self.cfg.GetMasterNode(),
9969
      }
9970

  
9971
  def Exec(self, feedback_fn):
9972
    if self.expandnames_calls < 1:
9973
      raise errors.ProgrammerError("ExpandNames was not called")
9974

  
9975
    if self.op.notify_exec:
9976
      self._Notify(False, constants.JQT_EXEC, None)
9977

  
9978
    self.LogInfo("Executing")
9979

  
9980
    if self.op.log_messages:
9981
      for idx, msg in enumerate(self.op.log_messages):
9982
        self.LogInfo("Sending log message %s", idx + 1)
9983
        feedback_fn(constants.JQT_MSGPREFIX + msg)
9984
        # Report how many test messages have been sent
9985
        self._Notify(False, constants.JQT_LOGMSG, idx + 1)
9986

  
9987
    if self.op.fail:
9988
      raise errors.OpExecError("Opcode failure was requested")
9989

  
9990
    return True
9991

  
9992

  
9851 9993
class IAllocator(object):
9852 9994
  """IAllocator framework.
9853 9995

  

Also available in: Unified diff