Revision 7e5a6e86 daemons/ganeti-masterd

b/daemons/ganeti-masterd
34 34
import pwd
35 35
import sys
36 36
import socket
37
import SocketServer
38 37
import time
39 38
import tempfile
40
import collections
41 39
import logging
42 40

  
43 41
from optparse import OptionParser
......
66 64

  
67 65
class ClientRequestWorker(workerpool.BaseWorker):
68 66
   # pylint: disable-msg=W0221
69
  def RunTask(self, server, request, client_address):
67
  def RunTask(self, server, message, client):
70 68
    """Process the request.
71 69

  
72 70
    """
71
    client_ops = ClientOps(server)
72

  
73 73
    try:
74
      server.request_handler_class(request, client_address, server)
75
    finally:
76
      request.close()
74
      (method, args) = luxi.ParseRequest(message)
75
    except luxi.ProtocolError, err:
76
      logging.error("Protocol Error: %s", err)
77
      client.close_log()
78
      return
79

  
80
    success = False
81
    try:
82
      result = client_ops.handle_request(method, args)
83
      success = True
84
    except errors.GenericError, err:
85
      logging.exception("Unexpected exception")
86
      success = False
87
      result = errors.EncodeException(err)
88
    except:
89
      logging.exception("Unexpected exception")
90
      err = sys.exc_info()
91
      result = "Caught exception: %s" % str(err[1])
92

  
93
    try:
94
      reply = luxi.FormatResponse(success, result)
95
      client.send_message(reply)
96
      # awake the main thread so that it can write out the data.
97
      server.awaker.signal()
98
    except:
99
      logging.exception("Send error")
100
      client.close_log()
101

  
102

  
103
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
104
  """Handler for master peers.
105

  
106
  """
107
  _MAX_UNHANDLED = 1
108
  def __init__(self, server, connected_socket, client_address, family):
109
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
110
                                                 client_address,
111
                                                 constants.LUXI_EOM,
112
                                                 family, self._MAX_UNHANDLED)
113
    self.server = server
114

  
115
  def handle_message(self, message, _):
116
    self.server.request_workers.AddTask(self.server, message, self)
77 117

  
78 118

  
79 119
class MasterServer(daemon.AsyncStreamServer):
......
83 123
  master socket.
84 124

  
85 125
  """
86
  def __init__(self, mainloop, address, handler_class, uid, gid):
126
  family = socket.AF_UNIX
127

  
128
  def __init__(self, mainloop, address, uid, gid):
87 129
    """MasterServer constructor
88 130

  
89 131
    @type mainloop: ganeti.daemon.Mainloop
90 132
    @param mainloop: Mainloop used to poll for I/O events
91 133
    @param address: the unix socket address to bind the MasterServer to
92
    @param handler_class: handler class for the connections
93 134
    @param uid: The uid of the owner of the socket
94 135
    @param gid: The gid of the owner of the socket
95 136

  
96 137
    """
97 138
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
98
    daemon.AsyncStreamServer.__init__(self, socket.AF_UNIX, temp_name)
139
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
99 140
    os.chmod(temp_name, 0770)
100 141
    os.chown(temp_name, uid, gid)
101 142
    os.rename(temp_name, address)
102 143

  
103
    self.request_handler_class = handler_class
104 144
    self.mainloop = mainloop
145
    self.awaker = daemon.AsyncAwaker()
105 146

  
106 147
    # We'll only start threads once we've forked.
107 148
    self.context = None
108 149
    self.request_workers = None
109 150

  
110 151
  def handle_connection(self, connected_socket, client_address):
111
    self.request_workers.AddTask(self, connected_socket, client_address)
152
    # TODO: add connection count and limit the number of open connections to a
153
    # maximum number to avoid breaking for lack of file descriptors or memory.
154
    MasterClientHandler(self, connected_socket, client_address, self.family)
112 155

  
113 156
  def setup_queue(self):
114 157
    self.context = GanetiContext()
......
132 175
        self.context.jobqueue.Shutdown()
133 176

  
134 177

  
135
class ClientRqHandler(SocketServer.BaseRequestHandler):
136
  """Client handler"""
137
  READ_SIZE = 4096
138

  
139
  def setup(self):
140
    # pylint: disable-msg=W0201
141
    # setup() is the api for initialising for this class
142
    self._buffer = ""
143
    self._msgs = collections.deque()
144
    self._ops = ClientOps(self.server)
145

  
146
  def handle(self):
147
    while True:
148
      msg = self.read_message()
149
      if msg is None:
150
        logging.debug("client closed connection")
151
        break
152

  
153
      (method, args) = luxi.ParseRequest(msg)
154

  
155
      success = False
156
      try:
157
        result = self._ops.handle_request(method, args)
158
        success = True
159
      except errors.GenericError, err:
160
        logging.exception("Unexpected exception")
161
        result = errors.EncodeException(err)
162
      except:
163
        logging.exception("Unexpected exception")
164
        result = "Caught exception: %s" % str(sys.exc_info()[1])
165

  
166
      self.send_message(luxi.FormatResponse(success, result))
167

  
168
  def read_message(self):
169
    while not self._msgs:
170
      data = self.request.recv(self.READ_SIZE)
171
      if not data:
172
        return None
173
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
174
      self._buffer = new_msgs.pop()
175
      self._msgs.extend(new_msgs)
176
    return self._msgs.popleft()
177

  
178
  def send_message(self, msg):
179
    # TODO: sendall is not guaranteed to send everything
180
    self.request.sendall(msg + constants.LUXI_EOM)
181

  
182

  
183 178
class ClientOps:
184 179
  """Class holding high-level client operations."""
185 180
  def __init__(self, server):
......
526 521
  utils.RemoveFile(constants.MASTER_SOCKET)
527 522

  
528 523
  mainloop = daemon.Mainloop()
529
  master = MasterServer(mainloop, constants.MASTER_SOCKET, ClientRqHandler,
524
  master = MasterServer(mainloop, constants.MASTER_SOCKET,
530 525
                        options.uid, options.gid)
531 526
  try:
532 527
    rpc.Init()

Also available in: Unified diff