Revision 23e50d39 daemons/ganeti-masterd

b/daemons/ganeti-masterd
29 29

  
30 30
import sys
31 31
import SocketServer
32
import threading
33 32
import time
34 33
import collections
35 34
import Queue
......
52 51
from ganeti import errors
53 52
from ganeti import ssconf
54 53
from ganeti import logger
54
from ganeti import workerpool
55 55

  
56 56

  
57
CLIENT_REQUEST_WORKERS = 16
58

  
57 59
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
58 60
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
59 61

  
60 62

  
63
class ClientRequestWorker(workerpool.BaseWorker):
64
  def RunTask(self, server, request, client_address):
65
    """Process the request.
66

  
67
    This is copied from the code in ThreadingMixIn.
68

  
69
    """
70
    try:
71
      server.finish_request(request, client_address)
72
      server.close_request(request)
73
    except:
74
      server.handle_error(request, client_address)
75
      server.close_request(request)
76

  
77

  
61 78
class IOServer(SocketServer.UnixStreamServer):
62 79
  """IO thread class.
63 80

  
......
81 98

  
82 99
    # We'll only start threads once we've forked.
83 100
    self.jobqueue = None
101
    self.request_workers = None
84 102

  
85 103
    signal.signal(signal.SIGINT, self.handle_quit_signals)
86 104
    signal.signal(signal.SIGTERM, self.handle_quit_signals)
87 105

  
88 106
  def setup_queue(self):
89 107
    self.jobqueue = jqueue.JobQueue(self.context)
90

  
91
  def process_request_thread(self, request, client_address):
92
    """Process the request.
93

  
94
    This is copied from the code in ThreadingMixIn.
95

  
96
    """
97
    try:
98
      self.finish_request(request, client_address)
99
      self.close_request(request)
100
    except:
101
      self.handle_error(request, client_address)
102
      self.close_request(request)
108
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
109
                                                 ClientRequestWorker)
103 110

  
104 111
  def process_request(self, request, client_address):
105
    """Start a new thread to process the request.
106

  
107
    This is copied from the coode in ThreadingMixIn.
112
    """Add task to workerpool to process request.
108 113

  
109 114
    """
110
    t = threading.Thread(target=self.process_request_thread,
111
                         args=(request, client_address))
112
    t.start()
115
    self.request_workers.AddTask(self, request, client_address)
113 116

  
114 117
  def handle_quit_signals(self, signum, frame):
115 118
    print "received %s in %s" % (signum, frame)
......
132 135
      self.server_close()
133 136
      utils.RemoveFile(constants.MASTER_SOCKET)
134 137
    finally:
138
      if self.request_workers:
139
        self.request_workers.Shutdown()
135 140
      if self.jobqueue:
136 141
        self.jobqueue.Shutdown()
137 142

  

Also available in: Unified diff