Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 99e88451

History | View | Annotate | Download (9.4 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import sys
31
import SocketServer
32
import time
33
import collections
34
import Queue
35
import random
36
import signal
37
import simplejson
38
import logging
39

    
40
from cStringIO import StringIO
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import mcpu
46
from ganeti import opcodes
47
from ganeti import jqueue
48
from ganeti import locking
49
from ganeti import luxi
50
from ganeti import utils
51
from ganeti import errors
52
from ganeti import ssconf
53
from ganeti import logger
54
from ganeti import workerpool
55

    
56

    
57
CLIENT_REQUEST_WORKERS = 16
58

    
59
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
60
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
61

    
62

    
63
class ClientRequestWorker(workerpool.BaseWorker):
64
  def RunTask(self, server, request, client_address):
65
    """Process the request.
66

    
67
    This is copied from the code in ThreadingMixIn.
68

    
69
    """
70
    try:
71
      server.finish_request(request, client_address)
72
      server.close_request(request)
73
    except:
74
      server.handle_error(request, client_address)
75
      server.close_request(request)
76

    
77

    
78
class IOServer(SocketServer.UnixStreamServer):
79
  """IO thread class.
80

    
81
  This class takes care of initializing the other threads, setting
82
  signal handlers (which are processed only in this thread), and doing
83
  cleanup at shutdown.
84

    
85
  """
86
  def __init__(self, address, rqhandler, context):
87
    """IOServer constructor
88

    
89
    Args:
90
      address: the address to bind this IOServer to
91
      rqhandler: RequestHandler type object
92
      context: Context Object common to all worker threads
93

    
94
    """
95
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
96
    self.context = context
97

    
98
    # We'll only start threads once we've forked.
99
    self.jobqueue = None
100
    self.request_workers = None
101

    
102
  def setup_queue(self):
103
    self.jobqueue = jqueue.JobQueue(self.context)
104
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
105
                                                 ClientRequestWorker)
106

    
107
  def process_request(self, request, client_address):
108
    """Add task to workerpool to process request.
109

    
110
    """
111
    self.request_workers.AddTask(self, request, client_address)
112

    
113
  def serve_forever(self):
114
    """Handle one request at a time until told to quit."""
115
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
116
    try:
117
      while not sighandler.called:
118
        self.handle_request()
119
    finally:
120
      sighandler.Reset()
121

    
122
  def server_cleanup(self):
123
    """Cleanup the server.
124

    
125
    This involves shutting down the processor threads and the master
126
    socket.
127

    
128
    """
129
    try:
130
      self.server_close()
131
      utils.RemoveFile(constants.MASTER_SOCKET)
132
    finally:
133
      if self.request_workers:
134
        self.request_workers.TerminateWorkers()
135
      if self.jobqueue:
136
        self.jobqueue.Shutdown()
137

    
138

    
139
class ClientRqHandler(SocketServer.BaseRequestHandler):
140
  """Client handler"""
141
  EOM = '\3'
142
  READ_SIZE = 4096
143

    
144
  def setup(self):
145
    self._buffer = ""
146
    self._msgs = collections.deque()
147
    self._ops = ClientOps(self.server)
148

    
149
  def handle(self):
150
    while True:
151
      msg = self.read_message()
152
      if msg is None:
153
        logging.info("client closed connection")
154
        break
155

    
156
      request = simplejson.loads(msg)
157
      logging.debug("request: %s", request)
158
      if not isinstance(request, dict):
159
        logging.error("wrong request received: %s", msg)
160
        break
161

    
162
      method = request.get(luxi.KEY_METHOD, None)
163
      args = request.get(luxi.KEY_ARGS, None)
164
      if method is None or args is None:
165
        logging.error("no method or args in request")
166
        break
167

    
168
      success = False
169
      try:
170
        result = self._ops.handle_request(method, args)
171
        success = True
172
      except:
173
        logging.error("Unexpected exception", exc_info=True)
174
        err = sys.exc_info()
175
        result = "Caught exception: %s" % str(err[1])
176

    
177
      response = {
178
        luxi.KEY_SUCCESS: success,
179
        luxi.KEY_RESULT: result,
180
        }
181
      logging.debug("response: %s", response)
182
      self.send_message(simplejson.dumps(response))
183

    
184
  def read_message(self):
185
    while not self._msgs:
186
      data = self.request.recv(self.READ_SIZE)
187
      if not data:
188
        return None
189
      new_msgs = (self._buffer + data).split(self.EOM)
190
      self._buffer = new_msgs.pop()
191
      self._msgs.extend(new_msgs)
192
    return self._msgs.popleft()
193

    
194
  def send_message(self, msg):
195
    #print "sending", msg
196
    self.request.sendall(msg + self.EOM)
197

    
198

    
199
class ClientOps:
200
  """Class holding high-level client operations."""
201
  def __init__(self, server):
202
    self.server = server
203

    
204
  def handle_request(self, method, args):
205
    queue = self.server.jobqueue
206

    
207
    # TODO: Parameter validation
208

    
209
    if method == luxi.REQ_SUBMIT_JOB:
210
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
211
      # we need to compute the node list here, since from now on all
212
      # operations require locks on the queue or the storage, and we
213
      # shouldn't get another lock
214
      node_list = self.server.context.cfg.GetNodeList()
215
      return queue.SubmitJob(ops, node_list)
216

    
217
    elif method == luxi.REQ_CANCEL_JOB:
218
      job_id = args
219
      return queue.CancelJob(job_id)
220

    
221
    elif method == luxi.REQ_ARCHIVE_JOB:
222
      job_id = args
223
      return queue.ArchiveJob(job_id)
224

    
225
    elif method == luxi.REQ_QUERY_JOBS:
226
      (job_ids, fields) = args
227
      return queue.QueryJobs(job_ids, fields)
228

    
229
    else:
230
      raise ValueError("Invalid operation")
231

    
232

    
233
class GanetiContext(object):
234
  """Context common to all ganeti threads.
235

    
236
  This class creates and holds common objects shared by all threads.
237

    
238
  """
239
  _instance = None
240

    
241
  def __init__(self):
242
    """Constructs a new GanetiContext object.
243

    
244
    There should be only a GanetiContext object at any time, so this
245
    function raises an error if this is not the case.
246

    
247
    """
248
    assert self.__class__._instance is None, "double GanetiContext instance"
249

    
250
    # Create a ConfigWriter...
251
    self.cfg = config.ConfigWriter()
252
    # And a GanetiLockingManager...
253
    self.glm = locking.GanetiLockManager(
254
                self.cfg.GetNodeList(),
255
                self.cfg.GetInstanceList())
256

    
257
    # setting this also locks the class against attribute modifications
258
    self.__class__._instance = self
259

    
260
  def __setattr__(self, name, value):
261
    """Setting GanetiContext attributes is forbidden after initialization.
262

    
263
    """
264
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
265
    object.__setattr__(self, name, value)
266

    
267

    
268
def CheckMaster(debug):
269
  """Checks the node setup.
270

    
271
  If this is the master, the function will return. Otherwise it will
272
  exit with an exit code based on the node status.
273

    
274
  """
275
  try:
276
    ss = ssconf.SimpleStore()
277
    master_name = ss.GetMasterNode()
278
  except errors.ConfigurationError, err:
279
    print "Cluster configuration incomplete: '%s'" % str(err)
280
    sys.exit(EXIT_NODESETUP_ERROR)
281

    
282
  try:
283
    myself = utils.HostInfo()
284
  except errors.ResolverError, err:
285
    sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0])
286
    sys.exit(EXIT_NODESETUP_ERROR)
287

    
288
  if myself.name != master_name:
289
    if debug:
290
      sys.stderr.write("Not master, exiting.\n")
291
    sys.exit(EXIT_NOTMASTER)
292

    
293

    
294
def ParseOptions():
295
  """Parse the command line options.
296

    
297
  Returns:
298
    (options, args) as from OptionParser.parse_args()
299

    
300
  """
301
  parser = OptionParser(description="Ganeti master daemon",
302
                        usage="%prog [-f] [-d]",
303
                        version="%%prog (ganeti) %s" %
304
                        constants.RELEASE_VERSION)
305

    
306
  parser.add_option("-f", "--foreground", dest="fork",
307
                    help="Don't detach from the current terminal",
308
                    default=True, action="store_false")
309
  parser.add_option("-d", "--debug", dest="debug",
310
                    help="Enable some debug messages",
311
                    default=False, action="store_true")
312
  options, args = parser.parse_args()
313
  return options, args
314

    
315

    
316
def main():
317
  """Main function"""
318

    
319
  options, args = ParseOptions()
320
  utils.debug = options.debug
321
  utils.no_fork = True
322

    
323
  CheckMaster(options.debug)
324

    
325
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler, GanetiContext())
326

    
327
  # become a daemon
328
  if options.fork:
329
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
330
                    noclose_fds=[master.fileno()])
331

    
332
  utils.WritePidFile(constants.MASTERD_PID)
333

    
334
  logger.SetupDaemon(constants.LOG_MASTERDAEMON, debug=options.debug,
335
                     stderr_logging=not options.fork)
336

    
337
  logging.info("ganeti master daemon startup")
338

    
339
  master.setup_queue()
340
  try:
341
    master.serve_forever()
342
  finally:
343
    master.server_cleanup()
344
    utils.RemovePidFile(constants.MASTERD_PID)
345

    
346

    
347
if __name__ == "__main__":
348
  main()