Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 610bc9ee

History | View | Annotate | Download (9.1 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import sys
31
import SocketServer
32
import time
33
import collections
34
import Queue
35
import random
36
import signal
37
import simplejson
38
import logging
39

    
40
from cStringIO import StringIO
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import mcpu
46
from ganeti import opcodes
47
from ganeti import jqueue
48
from ganeti import locking
49
from ganeti import luxi
50
from ganeti import utils
51
from ganeti import errors
52
from ganeti import ssconf
53
from ganeti import logger
54
from ganeti import workerpool
55

    
56

    
57
CLIENT_REQUEST_WORKERS = 16
58

    
59
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
60
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
61

    
62

    
63
class ClientRequestWorker(workerpool.BaseWorker):
64
  def RunTask(self, server, request, client_address):
65
    """Process the request.
66

    
67
    This is copied from the code in ThreadingMixIn.
68

    
69
    """
70
    try:
71
      server.finish_request(request, client_address)
72
      server.close_request(request)
73
    except:
74
      server.handle_error(request, client_address)
75
      server.close_request(request)
76

    
77

    
78
class IOServer(SocketServer.UnixStreamServer):
79
  """IO thread class.
80

    
81
  This class takes care of initializing the other threads, setting
82
  signal handlers (which are processed only in this thread), and doing
83
  cleanup at shutdown.
84

    
85
  """
86
  def __init__(self, address, rqhandler, context):
87
    """IOServer constructor
88

    
89
    Args:
90
      address: the address to bind this IOServer to
91
      rqhandler: RequestHandler type object
92
      context: Context Object common to all worker threads
93

    
94
    """
95
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
96
    self.context = context
97

    
98
    # We'll only start threads once we've forked.
99
    self.jobqueue = None
100
    self.request_workers = None
101

    
102
  def setup_queue(self):
103
    self.jobqueue = jqueue.JobQueue(self.context)
104
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
105
                                                 ClientRequestWorker)
106

    
107
  def process_request(self, request, client_address):
108
    """Add task to workerpool to process request.
109

    
110
    """
111
    self.request_workers.AddTask(self, request, client_address)
112

    
113
  def serve_forever(self):
114
    """Handle one request at a time until told to quit."""
115
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
116
    try:
117
      while not sighandler.called:
118
        self.handle_request()
119
    finally:
120
      sighandler.Reset()
121

    
122
  def server_cleanup(self):
123
    """Cleanup the server.
124

    
125
    This involves shutting down the processor threads and the master
126
    socket.
127

    
128
    """
129
    try:
130
      self.server_close()
131
      utils.RemoveFile(constants.MASTER_SOCKET)
132
    finally:
133
      if self.request_workers:
134
        self.request_workers.TerminateWorkers()
135
      if self.jobqueue:
136
        self.jobqueue.Shutdown()
137

    
138

    
139
class ClientRqHandler(SocketServer.BaseRequestHandler):
140
  """Client handler"""
141
  EOM = '\3'
142
  READ_SIZE = 4096
143

    
144
  def setup(self):
145
    self._buffer = ""
146
    self._msgs = collections.deque()
147
    self._ops = ClientOps(self.server)
148

    
149
  def handle(self):
150
    while True:
151
      msg = self.read_message()
152
      if msg is None:
153
        logging.info("client closed connection")
154
        break
155

    
156
      request = simplejson.loads(msg)
157
      logging.debug("request: %s", request)
158
      if not isinstance(request, dict):
159
        logging.error("wrong request received: %s", msg)
160
        break
161

    
162
      method = request.get(luxi.KEY_METHOD, None)
163
      args = request.get(luxi.KEY_ARGS, None)
164
      if method is None or args is None:
165
        logging.error("no method or args in request")
166
        break
167

    
168
      success = False
169
      try:
170
        result = self._ops.handle_request(method, args)
171
        success = True
172
      except:
173
        logging.error("Unexpected exception", exc_info=True)
174
        err = sys.exc_info()
175
        result = "Caught exception: %s" % str(err[1])
176

    
177
      response = {
178
        luxi.KEY_SUCCESS: success,
179
        luxi.KEY_RESULT: result,
180
        }
181
      logging.debug("response: %s", response)
182
      self.send_message(simplejson.dumps(response))
183

    
184
  def read_message(self):
185
    while not self._msgs:
186
      data = self.request.recv(self.READ_SIZE)
187
      if not data:
188
        return None
189
      new_msgs = (self._buffer + data).split(self.EOM)
190
      self._buffer = new_msgs.pop()
191
      self._msgs.extend(new_msgs)
192
    return self._msgs.popleft()
193

    
194
  def send_message(self, msg):
195
    #print "sending", msg
196
    self.request.sendall(msg + self.EOM)
197

    
198

    
199
class ClientOps:
200
  """Class holding high-level client operations."""
201
  def __init__(self, server):
202
    self.server = server
203

    
204
  def handle_request(self, method, args):
205
    queue = self.server.jobqueue
206

    
207
    # TODO: Parameter validation
208

    
209
    if method == luxi.REQ_SUBMIT_JOB:
210
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
211
      return queue.SubmitJob(ops)
212

    
213
    elif method == luxi.REQ_CANCEL_JOB:
214
      (job_id, ) = args
215
      return queue.CancelJob(job_id)
216

    
217
    elif method == luxi.REQ_ARCHIVE_JOB:
218
      (job_id, ) = args
219
      return queue.ArchiveJob(job_id)
220

    
221
    elif method == luxi.REQ_QUERY_JOBS:
222
      (job_ids, fields) = args
223
      return queue.QueryJobs(job_ids, fields)
224

    
225
    else:
226
      raise ValueError("Invalid operation")
227

    
228

    
229
class GanetiContext(object):
230
  """Context common to all ganeti threads.
231

    
232
  This class creates and holds common objects shared by all threads.
233

    
234
  """
235
  _instance = None
236

    
237
  def __init__(self):
238
    """Constructs a new GanetiContext object.
239

    
240
    There should be only a GanetiContext object at any time, so this
241
    function raises an error if this is not the case.
242

    
243
    """
244
    assert self.__class__._instance is None, "double GanetiContext instance"
245

    
246
    # Create a ConfigWriter...
247
    self.cfg = config.ConfigWriter()
248
    # And a GanetiLockingManager...
249
    self.glm = locking.GanetiLockManager(
250
                self.cfg.GetNodeList(),
251
                self.cfg.GetInstanceList())
252

    
253
    # setting this also locks the class against attribute modifications
254
    self.__class__._instance = self
255

    
256
  def __setattr__(self, name, value):
257
    """Setting GanetiContext attributes is forbidden after initialization.
258

    
259
    """
260
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
261
    object.__setattr__(self, name, value)
262

    
263

    
264
def CheckMaster(debug):
265
  """Checks the node setup.
266

    
267
  If this is the master, the function will return. Otherwise it will
268
  exit with an exit code based on the node status.
269

    
270
  """
271
  try:
272
    ss = ssconf.SimpleStore()
273
    master_name = ss.GetMasterNode()
274
  except errors.ConfigurationError, err:
275
    print "Cluster configuration incomplete: '%s'" % str(err)
276
    sys.exit(EXIT_NODESETUP_ERROR)
277

    
278
  try:
279
    myself = utils.HostInfo()
280
  except errors.ResolverError, err:
281
    sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0])
282
    sys.exit(EXIT_NODESETUP_ERROR)
283

    
284
  if myself.name != master_name:
285
    if debug:
286
      sys.stderr.write("Not master, exiting.\n")
287
    sys.exit(EXIT_NOTMASTER)
288

    
289

    
290
def ParseOptions():
291
  """Parse the command line options.
292

    
293
  Returns:
294
    (options, args) as from OptionParser.parse_args()
295

    
296
  """
297
  parser = OptionParser(description="Ganeti master daemon",
298
                        usage="%prog [-f] [-d]",
299
                        version="%%prog (ganeti) %s" %
300
                        constants.RELEASE_VERSION)
301

    
302
  parser.add_option("-f", "--foreground", dest="fork",
303
                    help="Don't detach from the current terminal",
304
                    default=True, action="store_false")
305
  parser.add_option("-d", "--debug", dest="debug",
306
                    help="Enable some debug messages",
307
                    default=False, action="store_true")
308
  options, args = parser.parse_args()
309
  return options, args
310

    
311

    
312
def main():
313
  """Main function"""
314

    
315
  options, args = ParseOptions()
316
  utils.debug = options.debug
317
  utils.no_fork = True
318

    
319
  CheckMaster(options.debug)
320

    
321
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler, GanetiContext())
322

    
323
  # become a daemon
324
  if options.fork:
325
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
326
                    noclose_fds=[master.fileno()])
327

    
328
  logger.SetupDaemon(constants.LOG_MASTERDAEMON, debug=options.debug,
329
                     stderr_logging=not options.fork)
330

    
331
  logging.info("ganeti master daemon startup")
332

    
333
  master.setup_queue()
334
  try:
335
    master.serve_forever()
336
  finally:
337
    master.server_cleanup()
338

    
339

    
340
if __name__ == "__main__":
341
  main()