Revision cdd7f900 daemons/ganeti-masterd

b/daemons/ganeti-masterd
30 30
# C0103: Invalid name ganeti-masterd
31 31

  
32 32
import sys
33
import socket
33 34
import SocketServer
34 35
import time
35 36
import collections
36
import signal
37 37
import logging
38 38

  
39 39
from optparse import OptionParser
......
65 65
  def RunTask(self, server, request, client_address):
66 66
    """Process the request.
67 67

  
68
    This is copied from the code in ThreadingMixIn.
69

  
70 68
    """
71 69
    try:
72
      server.finish_request(request, client_address)
73
      server.close_request(request)
74
    except: # pylint: disable-msg=W0702
75
      server.handle_error(request, client_address)
76
      server.close_request(request)
70
      server.request_handler_class(request, client_address, server)
71
    finally:
72
      request.close()
77 73

  
78 74

  
79
class IOServer(SocketServer.UnixStreamServer):
80
  """IO thread class.
75
class MasterServer(daemon.AsyncStreamServer):
76
  """Master Server.
81 77

  
82
  This class takes care of initializing the other threads, setting
83
  signal handlers (which are processed only in this thread), and doing
84
  cleanup at shutdown.
78
  This is the main asynchronous master server. It handles connections to the
79
  master socket.
85 80

  
86 81
  """
87
  def __init__(self, address, rqhandler):
88
    """IOServer constructor
82
  def __init__(self, mainloop, address, handler_class):
83
    """MasterServer constructor
89 84

  
90
    @param address: the address to bind this IOServer to
91
    @param rqhandler: RequestHandler type object
85
    @type mainloop: ganeti.daemon.Mainloop
86
    @param mainloop: Mainloop used to poll for I/O events
87
    @param address: the unix socket address to bind the MasterServer to
88
    @param handler_class: handler class for the connections
92 89

  
93 90
    """
94
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
91
    daemon.AsyncStreamServer.__init__(self, socket.AF_UNIX, address)
92
    self.request_handler_class = handler_class
93
    self.mainloop = mainloop
95 94

  
96 95
    # We'll only start threads once we've forked.
97 96
    self.context = None
98 97
    self.request_workers = None
99 98

  
99
  def handle_connection(self, connected_socket, client_address):
100
    self.request_workers.AddTask(self, connected_socket, client_address)
101

  
100 102
  def setup_queue(self):
101 103
    self.context = GanetiContext()
102 104
    self.request_workers = workerpool.WorkerPool("ClientReq",
103 105
                                                 CLIENT_REQUEST_WORKERS,
104 106
                                                 ClientRequestWorker)
105 107

  
106
  def process_request(self, request, client_address):
107
    """Add task to workerpool to process request.
108

  
109
    """
110
    (pid, uid, gid) = utils.GetSocketCredentials(request)
111
    logging.info("Accepted connection from pid=%s, uid=%s, gid=%s",
112
                 pid, uid, gid)
113

  
114
    self.request_workers.AddTask(self, request, client_address)
115

  
116
  def handle_error(self, request, client_address):
117
    logging.exception("Error while handling request")
118

  
119
  @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
120
  def serve_forever(self, signal_handlers=None): # pylint: disable-msg=W0221
121
    """Handle one request at a time until told to quit."""
122
    assert isinstance(signal_handlers, dict) and \
123
           len(signal_handlers) > 0, \
124
           "Broken SignalHandled decorator"
125
    # Since we use SignalHandled only once, the resulting dict will map all
126
    # signals to the same handler. We'll just use the first one.
127
    sighandler = signal_handlers.values()[0]
128
    while not sighandler.called:
129
      self.handle_request()
130

  
131 108
  def server_cleanup(self):
132 109
    """Cleanup the server.
133 110

  
......
136 113

  
137 114
    """
138 115
    try:
139
      self.server_close()
116
      self.close()
140 117
    finally:
141 118
      if self.request_workers:
142 119
        self.request_workers.TerminateWorkers()
......
528 505
  # concurrent execution.
529 506
  utils.RemoveFile(constants.MASTER_SOCKET)
530 507

  
531
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
508
  mainloop = daemon.Mainloop()
509
  master = MasterServer(mainloop, constants.MASTER_SOCKET, ClientRqHandler)
532 510
  try:
533 511
    rpc.Init()
534 512
    try:
......
541 519

  
542 520
      master.setup_queue()
543 521
      try:
544
        master.serve_forever()
522
        mainloop.Run()
545 523
      finally:
546 524
        master.server_cleanup()
547 525
    finally:

Also available in: Unified diff