Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ cc2bea8b

History | View | Annotate | Download (12 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import sys
31
import SocketServer
32
import threading
33
import time
34
import collections
35
import Queue
36
import random
37
import signal
38
import simplejson
39
import logging
40

    
41
from cStringIO import StringIO
42
from optparse import OptionParser
43

    
44
from ganeti import config
45
from ganeti import constants
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import logger
55

    
56

    
57
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
58
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
59

    
60

    
61
class IOServer(SocketServer.UnixStreamServer):
62
  """IO thread class.
63

    
64
  This class takes care of initializing the other threads, setting
65
  signal handlers (which are processed only in this thread), and doing
66
  cleanup at shutdown.
67

    
68
  """
69
  QUEUE_PROCESSOR_SIZE = 5
70

    
71
  def __init__(self, address, rqhandler, context):
72
    """IOServer constructor
73

    
74
    Args:
75
      address: the address to bind this IOServer to
76
      rqhandler: RequestHandler type object
77
      context: Context Object common to all worker threads
78

    
79
    """
80
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
81
    self.do_quit = False
82
    self.queue = jqueue.QueueManager()
83
    self.context = context
84
    self.processors = []
85
    signal.signal(signal.SIGINT, self.handle_quit_signals)
86
    signal.signal(signal.SIGTERM, self.handle_quit_signals)
87

    
88
  def setup_processors(self):
89
    """Spawn the processors threads.
90

    
91
    This initializes the queue and the thread processors. It is done
92
    separately from the constructor because we want the clone()
93
    syscalls to happen after the daemonize part.
94

    
95
    """
96
    for i in range(self.QUEUE_PROCESSOR_SIZE):
97
      self.processors.append(threading.Thread(target=PoolWorker,
98
                                              args=(i, self.queue.new_queue,
99
                                                    self.context)))
100
    for t in self.processors:
101
      t.start()
102

    
103
  def process_request_thread(self, request, client_address):
104
    """Process the request.
105

    
106
    This is copied from the code in ThreadingMixIn.
107

    
108
    """
109
    try:
110
      self.finish_request(request, client_address)
111
      self.close_request(request)
112
    except:
113
      self.handle_error(request, client_address)
114
      self.close_request(request)
115

    
116
  def process_request(self, request, client_address):
117
    """Start a new thread to process the request.
118

    
119
    This is copied from the coode in ThreadingMixIn.
120

    
121
    """
122
    t = threading.Thread(target=self.process_request_thread,
123
                         args=(request, client_address))
124
    t.start()
125

    
126
  def handle_quit_signals(self, signum, frame):
127
    print "received %s in %s" % (signum, frame)
128
    self.do_quit = True
129

    
130
  def serve_forever(self):
131
    """Handle one request at a time until told to quit."""
132
    while not self.do_quit:
133
      self.handle_request()
134
      print "served request, quit=%s" % (self.do_quit)
135

    
136
  def server_cleanup(self):
137
    """Cleanup the server.
138

    
139
    This involves shutting down the processor threads and the master
140
    socket.
141

    
142
    """
143
    self.server_close()
144
    utils.RemoveFile(constants.MASTER_SOCKET)
145
    for i in range(self.QUEUE_PROCESSOR_SIZE):
146
      self.queue.new_queue.put(None)
147
    for idx, t in enumerate(self.processors):
148
      logging.debug("waiting for processor thread %s...", idx)
149
      t.join()
150
    logging.debug("threads done")
151

    
152

    
153
class ClientRqHandler(SocketServer.BaseRequestHandler):
154
  """Client handler"""
155
  EOM = '\3'
156
  READ_SIZE = 4096
157

    
158
  def setup(self):
159
    self._buffer = ""
160
    self._msgs = collections.deque()
161
    self._ops = ClientOps(self.server)
162

    
163
  def handle(self):
164
    while True:
165
      msg = self.read_message()
166
      if msg is None:
167
        print "client closed connection"
168
        break
169
      request = simplejson.loads(msg)
170
      if not isinstance(request, dict):
171
        print "wrong request received: %s" % msg
172
        break
173
      method = request.get('request', None)
174
      data = request.get('data', None)
175
      if method is None or data is None:
176
        print "no method or data in request"
177
        break
178
      print "request:", method, data
179
      result = self._ops.handle_request(method, data)
180
      print "result:", result
181
      self.send_message(simplejson.dumps({'success': True, 'result': result}))
182

    
183
  def read_message(self):
184
    while not self._msgs:
185
      data = self.request.recv(self.READ_SIZE)
186
      if not data:
187
        return None
188
      new_msgs = (self._buffer + data).split(self.EOM)
189
      self._buffer = new_msgs.pop()
190
      self._msgs.extend(new_msgs)
191
    return self._msgs.popleft()
192

    
193
  def send_message(self, msg):
194
    #print "sending", msg
195
    self.request.sendall(msg + self.EOM)
196

    
197

    
198
class ClientOps:
199
  """Class holding high-level client operations."""
200
  def __init__(self, server):
201
    self.server = server
202
    self._cpu = None
203

    
204
  def _getcpu(self):
205
    if self._cpu is None:
206
      self._cpu = mcpu.Processor(lambda x: None)
207
    return self._cpu
208

    
209
  def handle_request(self, operation, args):
210
    print operation, args
211
    if operation == "submit":
212
      return self.put(args)
213
    elif operation == "query":
214
      return self.query(args)
215
    else:
216
      raise ValueError("Invalid operation")
217

    
218
  def put(self, args):
219
    job = luxi.UnserializeJob(args)
220
    rid = self.server.queue.put(job)
221
    return rid
222

    
223
  def query(self, args):
224
    path = args["object"]
225
    fields = args["fields"]
226
    names = args["names"]
227
    if path == "instances":
228
      opclass = opcodes.OpQueryInstances
229
    elif path == "jobs":
230
      # early exit because job query-ing is special (not via opcodes)
231
      return self.query_jobs(fields, names)
232
    else:
233
      raise ValueError("Invalid object %s" % path)
234

    
235
    op = opclass(output_fields = fields, names=names)
236
    cpu = self._getcpu()
237
    result = cpu.ExecOpCode(op)
238
    return result
239

    
240
  def query_jobs(self, fields, names):
241
    return self.server.queue.query_jobs(fields, names)
242

    
243

    
244
def JobRunner(proc, job, context):
245
  """Job executor.
246

    
247
  This functions processes a single job in the context of given
248
  processor instance.
249

    
250
  Args:
251
    proc: Ganeti Processor to run the job on
252
    job: The job to run (unserialized format)
253
    context: Ganeti shared context
254

    
255
  """
256
  job.SetStatus(opcodes.Job.STATUS_RUNNING)
257
  fail = False
258
  for idx, op in enumerate(job.data.op_list):
259
    job.data.op_status[idx] = opcodes.Job.STATUS_RUNNING
260
    try:
261
      job.data.op_result[idx] = proc.ExecOpCode(op)
262
      job.data.op_status[idx] = opcodes.Job.STATUS_SUCCESS
263
    except (errors.OpPrereqError, errors.OpExecError), err:
264
      fail = True
265
      job.data.op_result[idx] = str(err)
266
      job.data.op_status[idx] = opcodes.Job.STATUS_FAIL
267
  if fail:
268
    job.SetStatus(opcodes.Job.STATUS_FAIL)
269
  else:
270
    job.SetStatus(opcodes.Job.STATUS_SUCCESS)
271

    
272

    
273
def PoolWorker(worker_id, incoming_queue, context):
274
  """A worker thread function.
275

    
276
  This is the actual processor of a single thread of Job execution.
277

    
278
  Args:
279
    worker_id: the unique id for this worker
280
    incoming_queue: a queue to get jobs from
281
    context: the common server context, containing all shared data and
282
             synchronization structures.
283

    
284
  """
285
  while True:
286
    logging.debug("worker %s sleeping", worker_id)
287
    item = incoming_queue.get(True)
288
    if item is None:
289
      break
290
    logging.debug("worker %s processing job %s", worker_id, item.data.job_id)
291
    proc = mcpu.Processor(context, feedback=lambda x: None)
292
    try:
293
      JobRunner(proc, item, context)
294
    except errors.GenericError, err:
295
      msg = "ganeti exception"
296
      logging.error(msg, exc_info=err)
297
      item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
298
    except Exception, err:
299
      msg = "unhandled exception"
300
      logging.error(msg, exc_info=err)
301
      item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
302
    except:
303
      msg = "unhandled unknown exception"
304
      logging.error(msg, exc_info=True)
305
      item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
306
    logging.debug("worker %s finish job %s", worker_id, item.data.job_id)
307
  logging.debug("worker %s exiting", worker_id)
308

    
309

    
310
class GanetiContext(object):
311
  """Context common to all ganeti threads.
312

    
313
  This class creates and holds common objects shared by all threads.
314

    
315
  """
316
  _instance = None
317

    
318
  def __init__(self):
319
    """Constructs a new GanetiContext object.
320

    
321
    There should be only a GanetiContext object at any time, so this
322
    function raises an error if this is not the case.
323

    
324
    """
325
    assert self.__class__._instance is None, "double GanetiContext instance"
326

    
327
    # Create a ConfigWriter...
328
    self.cfg = config.ConfigWriter()
329
    # And a GanetiLockingManager...
330
    self.glm = locking.GanetiLockManager(
331
                self.cfg.GetNodeList(),
332
                self.cfg.GetInstanceList())
333

    
334
    # setting this also locks the class against attribute modifications
335
    self.__class__._instance = self
336

    
337
  def __setattr__(self, name, value):
338
    """Setting GanetiContext attributes is forbidden after initialization.
339

    
340
    """
341
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
342
    object.__setattr__(self, name, value)
343

    
344

    
345
def CheckMaster(debug):
346
  """Checks the node setup.
347

    
348
  If this is the master, the function will return. Otherwise it will
349
  exit with an exit code based on the node status.
350

    
351
  """
352
  try:
353
    ss = ssconf.SimpleStore()
354
    master_name = ss.GetMasterNode()
355
  except errors.ConfigurationError, err:
356
    print "Cluster configuration incomplete: '%s'" % str(err)
357
    sys.exit(EXIT_NODESETUP_ERROR)
358

    
359
  try:
360
    myself = utils.HostInfo()
361
  except errors.ResolverError, err:
362
    sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0])
363
    sys.exit(EXIT_NODESETUP_ERROR)
364

    
365
  if myself.name != master_name:
366
    if debug:
367
      sys.stderr.write("Not master, exiting.\n")
368
    sys.exit(EXIT_NOTMASTER)
369

    
370

    
371
def ParseOptions():
372
  """Parse the command line options.
373

    
374
  Returns:
375
    (options, args) as from OptionParser.parse_args()
376

    
377
  """
378
  parser = OptionParser(description="Ganeti master daemon",
379
                        usage="%prog [-f] [-d]",
380
                        version="%%prog (ganeti) %s" %
381
                        constants.RELEASE_VERSION)
382

    
383
  parser.add_option("-f", "--foreground", dest="fork",
384
                    help="Don't detach from the current terminal",
385
                    default=True, action="store_false")
386
  parser.add_option("-d", "--debug", dest="debug",
387
                    help="Enable some debug messages",
388
                    default=False, action="store_true")
389
  options, args = parser.parse_args()
390
  return options, args
391

    
392

    
393
def main():
394
  """Main function"""
395

    
396
  options, args = ParseOptions()
397
  utils.debug = options.debug
398
  utils.no_fork = True
399

    
400
  logger.SetupLogging(program="ganeti-masterd", debug=options.debug)
401

    
402
  CheckMaster(options.debug)
403

    
404
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler, GanetiContext())
405

    
406
  # become a daemon
407
  if options.fork:
408
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
409
                    noclose_fds=[master.fileno()])
410

    
411
  try:
412
    utils.Lock('cmd', debug=options.debug)
413
  except errors.LockError, err:
414
    print >> sys.stderr, str(err)
415
    master.server_cleanup()
416
    return
417

    
418
  try:
419
    master.setup_processors()
420
    try:
421
      master.serve_forever()
422
    finally:
423
      master.server_cleanup()
424
  finally:
425
    utils.Unlock('cmd')
426
    utils.LockCleanup()
427

    
428

    
429
if __name__ == "__main__":
430
  main()