Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ c54784d9

History | View | Annotate | Download (9.5 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import sys
31
import SocketServer
32
import time
33
import collections
34
import Queue
35
import random
36
import signal
37
import simplejson
38
import logging
39

    
40
from cStringIO import StringIO
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import mcpu
46
from ganeti import opcodes
47
from ganeti import jqueue
48
from ganeti import locking
49
from ganeti import luxi
50
from ganeti import utils
51
from ganeti import errors
52
from ganeti import ssconf
53
from ganeti import logger
54
from ganeti import workerpool
55
from ganeti import rpc
56

    
57

    
58
CLIENT_REQUEST_WORKERS = 16
59

    
60
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
61
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
62

    
63

    
64
class ClientRequestWorker(workerpool.BaseWorker):
65
  def RunTask(self, server, request, client_address):
66
    """Process the request.
67

    
68
    This is copied from the code in ThreadingMixIn.
69

    
70
    """
71
    try:
72
      server.finish_request(request, client_address)
73
      server.close_request(request)
74
    except:
75
      server.handle_error(request, client_address)
76
      server.close_request(request)
77

    
78

    
79
class IOServer(SocketServer.UnixStreamServer):
80
  """IO thread class.
81

    
82
  This class takes care of initializing the other threads, setting
83
  signal handlers (which are processed only in this thread), and doing
84
  cleanup at shutdown.
85

    
86
  """
87
  def __init__(self, address, rqhandler, context):
88
    """IOServer constructor
89

    
90
    Args:
91
      address: the address to bind this IOServer to
92
      rqhandler: RequestHandler type object
93
      context: Context Object common to all worker threads
94

    
95
    """
96
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
97
    self.context = context
98

    
99
    # We'll only start threads once we've forked.
100
    self.jobqueue = None
101
    self.request_workers = None
102

    
103
  def setup_queue(self):
104
    self.jobqueue = jqueue.JobQueue(self.context)
105
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
106
                                                 ClientRequestWorker)
107

    
108
  def process_request(self, request, client_address):
109
    """Add task to workerpool to process request.
110

    
111
    """
112
    self.request_workers.AddTask(self, request, client_address)
113

    
114
  def serve_forever(self):
115
    """Handle one request at a time until told to quit."""
116
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
117
    try:
118
      while not sighandler.called:
119
        self.handle_request()
120
    finally:
121
      sighandler.Reset()
122

    
123
  def server_cleanup(self):
124
    """Cleanup the server.
125

    
126
    This involves shutting down the processor threads and the master
127
    socket.
128

    
129
    """
130
    try:
131
      self.server_close()
132
      utils.RemoveFile(constants.MASTER_SOCKET)
133
    finally:
134
      if self.request_workers:
135
        self.request_workers.TerminateWorkers()
136
      if self.jobqueue:
137
        self.jobqueue.Shutdown()
138

    
139

    
140
class ClientRqHandler(SocketServer.BaseRequestHandler):
141
  """Client handler"""
142
  EOM = '\3'
143
  READ_SIZE = 4096
144

    
145
  def setup(self):
146
    self._buffer = ""
147
    self._msgs = collections.deque()
148
    self._ops = ClientOps(self.server)
149

    
150
  def handle(self):
151
    while True:
152
      msg = self.read_message()
153
      if msg is None:
154
        logging.info("client closed connection")
155
        break
156

    
157
      request = simplejson.loads(msg)
158
      logging.debug("request: %s", request)
159
      if not isinstance(request, dict):
160
        logging.error("wrong request received: %s", msg)
161
        break
162

    
163
      method = request.get(luxi.KEY_METHOD, None)
164
      args = request.get(luxi.KEY_ARGS, None)
165
      if method is None or args is None:
166
        logging.error("no method or args in request")
167
        break
168

    
169
      success = False
170
      try:
171
        result = self._ops.handle_request(method, args)
172
        success = True
173
      except:
174
        logging.error("Unexpected exception", exc_info=True)
175
        err = sys.exc_info()
176
        result = "Caught exception: %s" % str(err[1])
177

    
178
      response = {
179
        luxi.KEY_SUCCESS: success,
180
        luxi.KEY_RESULT: result,
181
        }
182
      logging.debug("response: %s", response)
183
      self.send_message(simplejson.dumps(response))
184

    
185
  def read_message(self):
186
    while not self._msgs:
187
      data = self.request.recv(self.READ_SIZE)
188
      if not data:
189
        return None
190
      new_msgs = (self._buffer + data).split(self.EOM)
191
      self._buffer = new_msgs.pop()
192
      self._msgs.extend(new_msgs)
193
    return self._msgs.popleft()
194

    
195
  def send_message(self, msg):
196
    #print "sending", msg
197
    self.request.sendall(msg + self.EOM)
198

    
199

    
200
class ClientOps:
201
  """Class holding high-level client operations."""
202
  def __init__(self, server):
203
    self.server = server
204

    
205
  def handle_request(self, method, args):
206
    queue = self.server.jobqueue
207

    
208
    # TODO: Parameter validation
209

    
210
    if method == luxi.REQ_SUBMIT_JOB:
211
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
212
      # we need to compute the node list here, since from now on all
213
      # operations require locks on the queue or the storage, and we
214
      # shouldn't get another lock
215
      node_list = self.server.context.cfg.GetNodeList()
216
      return queue.SubmitJob(ops, node_list)
217

    
218
    elif method == luxi.REQ_CANCEL_JOB:
219
      job_id = args
220
      return queue.CancelJob(job_id)
221

    
222
    elif method == luxi.REQ_ARCHIVE_JOB:
223
      job_id = args
224
      return queue.ArchiveJob(job_id)
225

    
226
    elif method == luxi.REQ_QUERY_JOBS:
227
      (job_ids, fields) = args
228
      return queue.QueryJobs(job_ids, fields)
229

    
230
    elif method == luxi.REQ_QUERY_INSTANCES:
231
      (names, fields) = args
232
      op = opcodes.OpQueryInstances(names=names, output_fields=fields)
233
      return self._Query(op)
234

    
235
    elif method == luxi.REQ_QUERY_NODES:
236
      (names, fields) = args
237
      op = opcodes.OpQueryNodes(names=names, output_fields=fields)
238
      return self._Query(op)
239

    
240
    else:
241
      raise ValueError("Invalid operation")
242

    
243
  def _DummyLog(self, *args):
244
    pass
245

    
246
  def _Query(self, op):
247
    """Runs the specified opcode and returns the result.
248

    
249
    """
250
    proc = mcpu.Processor(self.server.context)
251
    # TODO: Where should log messages go?
252
    return proc.ExecOpCode(op, self._DummyLog)
253

    
254

    
255
class GanetiContext(object):
256
  """Context common to all ganeti threads.
257

    
258
  This class creates and holds common objects shared by all threads.
259

    
260
  """
261
  _instance = None
262

    
263
  def __init__(self):
264
    """Constructs a new GanetiContext object.
265

    
266
    There should be only a GanetiContext object at any time, so this
267
    function raises an error if this is not the case.
268

    
269
    """
270
    assert self.__class__._instance is None, "double GanetiContext instance"
271

    
272
    # Create a ConfigWriter...
273
    self.cfg = config.ConfigWriter()
274
    # And a GanetiLockingManager...
275
    self.glm = locking.GanetiLockManager(
276
                self.cfg.GetNodeList(),
277
                self.cfg.GetInstanceList())
278

    
279
    # setting this also locks the class against attribute modifications
280
    self.__class__._instance = self
281

    
282
  def __setattr__(self, name, value):
283
    """Setting GanetiContext attributes is forbidden after initialization.
284

    
285
    """
286
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
287
    object.__setattr__(self, name, value)
288

    
289

    
290
def ParseOptions():
291
  """Parse the command line options.
292

    
293
  Returns:
294
    (options, args) as from OptionParser.parse_args()
295

    
296
  """
297
  parser = OptionParser(description="Ganeti master daemon",
298
                        usage="%prog [-f] [-d]",
299
                        version="%%prog (ganeti) %s" %
300
                        constants.RELEASE_VERSION)
301

    
302
  parser.add_option("-f", "--foreground", dest="fork",
303
                    help="Don't detach from the current terminal",
304
                    default=True, action="store_false")
305
  parser.add_option("-d", "--debug", dest="debug",
306
                    help="Enable some debug messages",
307
                    default=False, action="store_true")
308
  options, args = parser.parse_args()
309
  return options, args
310

    
311

    
312
def main():
313
  """Main function"""
314

    
315
  options, args = ParseOptions()
316
  utils.debug = options.debug
317
  utils.no_fork = True
318

    
319
  ssconf.CheckMaster(options.debug)
320

    
321
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler, GanetiContext())
322

    
323
  # become a daemon
324
  if options.fork:
325
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
326
                    noclose_fds=[master.fileno()])
327

    
328
  utils.WritePidFile(constants.MASTERD_PID)
329

    
330
  logger.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
331
                      stderr_logging=not options.fork)
332

    
333
  logging.info("ganeti master daemon startup")
334

    
335
  # activate ip
336
  master_node = ssconf.SimpleStore().GetMasterNode()
337
  if not rpc.call_node_start_master(master_node, False):
338
    logging.error("Can't activate master IP address")
339

    
340
  master.setup_queue()
341
  try:
342
    master.serve_forever()
343
  finally:
344
    master.server_cleanup()
345
    utils.RemovePidFile(constants.MASTERD_PID)
346

    
347

    
348
if __name__ == "__main__":
349
  main()