Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ ea6e6c2b

History | View | Annotate | Download (10.2 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import sys
31
import SocketServer
32
import threading
33
import time
34
import collections
35
import Queue
36
import random
37
import signal
38
import simplejson
39

    
40

    
41
from cStringIO import StringIO
42
from optparse import OptionParser
43

    
44
from ganeti import constants
45
from ganeti import mcpu
46
from ganeti import opcodes
47
from ganeti import jqueue
48
from ganeti import luxi
49
from ganeti import utils
50
from ganeti import errors
51
from ganeti import ssconf
52

    
53

    
54
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
55
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
56

    
57

    
58
class IOServer(SocketServer.UnixStreamServer):
59
  """IO thread class.
60

    
61
  This class takes care of initializing the other threads, setting
62
  signal handlers (which are processed only in this thread), and doing
63
  cleanup at shutdown.
64

    
65
  """
66
  QUEUE_PROCESSOR_SIZE = 1
67

    
68
  def __init__(self, address, rqhandler):
69
    """IOServer constructor
70

    
71
    Args:
72
      address: the address to bind this IOServer to
73
      rqhandler: RequestHandler type object
74

    
75
    """
76
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
77
    self.do_quit = False
78
    self.queue = jqueue.QueueManager()
79
    self.processors = []
80
    signal.signal(signal.SIGINT, self.handle_quit_signals)
81
    signal.signal(signal.SIGTERM, self.handle_quit_signals)
82

    
83
  def setup_processors(self):
84
    """Spawn the processors threads.
85

    
86
    This initializes the queue and the thread processors. It is done
87
    separately from the constructor because we want the clone()
88
    syscalls to happen after the daemonize part.
89

    
90
    """
91
    for i in range(self.QUEUE_PROCESSOR_SIZE):
92
      self.processors.append(threading.Thread(target=PoolWorker,
93
                                              args=(i, self.queue.new_queue)))
94
    for t in self.processors:
95
      t.start()
96

    
97
  def process_request_thread(self, request, client_address):
98
    """Process the request.
99

    
100
    This is copied from the code in ThreadingMixIn.
101

    
102
    """
103
    try:
104
      self.finish_request(request, client_address)
105
      self.close_request(request)
106
    except:
107
      self.handle_error(request, client_address)
108
      self.close_request(request)
109

    
110
  def process_request(self, request, client_address):
111
    """Start a new thread to process the request.
112

    
113
    This is copied from the coode in ThreadingMixIn.
114

    
115
    """
116
    t = threading.Thread(target=self.process_request_thread,
117
                         args=(request, client_address))
118
    t.start()
119

    
120
  def handle_quit_signals(self, signum, frame):
121
    print "received %s in %s" % (signum, frame)
122
    self.do_quit = True
123

    
124
  def serve_forever(self):
125
    """Handle one request at a time until told to quit."""
126
    while not self.do_quit:
127
      self.handle_request()
128
      print "served request, quit=%s" % (self.do_quit)
129

    
130
  def server_cleanup(self):
131
    """Cleanup the server.
132

    
133
    This involves shutting down the processor threads and the master
134
    socket.
135

    
136
    """
137
    self.server_close()
138
    utils.RemoveFile(constants.MASTER_SOCKET)
139
    for i in range(self.QUEUE_PROCESSOR_SIZE):
140
      self.queue.new_queue.put(None)
141
    for idx, t in enumerate(self.processors):
142
      print "waiting for processor thread %s..." % idx
143
      t.join()
144
    print "done threads"
145

    
146

    
147
class ClientRqHandler(SocketServer.BaseRequestHandler):
148
  """Client handler"""
149
  EOM = '\3'
150
  READ_SIZE = 4096
151

    
152
  def setup(self):
153
    self._buffer = ""
154
    self._msgs = collections.deque()
155
    self._ops = ClientOps(self.server)
156

    
157
  def handle(self):
158
    while True:
159
      msg = self.read_message()
160
      if msg is None:
161
        print "client closed connection"
162
        break
163
      request = simplejson.loads(msg)
164
      if not isinstance(request, dict):
165
        print "wrong request received: %s" % msg
166
        break
167
      method = request.get('request', None)
168
      data = request.get('data', None)
169
      if method is None or data is None:
170
        print "no method or data in request"
171
        break
172
      print "request:", method, data
173
      result = self._ops.handle_request(method, data)
174
      print "result:", result
175
      self.send_message(simplejson.dumps({'success': True, 'result': result}))
176

    
177
  def read_message(self):
178
    while not self._msgs:
179
      data = self.request.recv(self.READ_SIZE)
180
      if not data:
181
        return None
182
      new_msgs = (self._buffer + data).split(self.EOM)
183
      self._buffer = new_msgs.pop()
184
      self._msgs.extend(new_msgs)
185
    return self._msgs.popleft()
186

    
187
  def send_message(self, msg):
188
    #print "sending", msg
189
    self.request.sendall(msg + self.EOM)
190

    
191

    
192
class ClientOps:
193
  """Class holding high-level client operations."""
194
  def __init__(self, server):
195
    self.server = server
196
    self._cpu = None
197

    
198
  def _getcpu(self):
199
    if self._cpu is None:
200
      self._cpu = mcpu.Processor(lambda x: None)
201
    return self._cpu
202

    
203
  def handle_request(self, operation, args):
204
    print operation, args
205
    if operation == "submit":
206
      return self.put(args)
207
    elif operation == "query":
208
      return self.query(args)
209
    else:
210
      raise ValueError("Invalid operation")
211

    
212
  def put(self, args):
213
    job = luxi.UnserializeJob(args)
214
    rid = self.server.queue.put(job)
215
    return rid
216

    
217
  def query(self, args):
218
    path = args["object"]
219
    fields = args["fields"]
220
    names = args["names"]
221
    if path == "instances":
222
      opclass = opcodes.OpQueryInstances
223
    elif path == "jobs":
224
      # early exit because job query-ing is special (not via opcodes)
225
      return self.query_jobs(fields, names)
226
    else:
227
      raise ValueError("Invalid object %s" % path)
228

    
229
    op = opclass(output_fields = fields, names=names)
230
    cpu = self._getcpu()
231
    result = cpu.ExecOpCode(op)
232
    return result
233

    
234
  def query_jobs(self, fields, names):
235
    return self.server.queue.query_jobs(fields, names)
236

    
237

    
238
def JobRunner(proc, job):
239
  """Job executor.
240

    
241
  This functions processes a single job in the context of given
242
  processor instance.
243

    
244
  Args:
245
    proc: Ganeti Processor to run the job on
246
    job: The job to run (unserialized format)
247

    
248
  """
249
  job.SetStatus(opcodes.Job.STATUS_RUNNING)
250
  fail = False
251
  for idx, op in enumerate(job.data.op_list):
252
    job.data.op_status[idx] = opcodes.Job.STATUS_RUNNING
253
    try:
254
      job.data.op_result[idx] = proc.ExecOpCode(op)
255
      job.data.op_status[idx] = opcodes.Job.STATUS_SUCCESS
256
    except (errors.OpPrereqError, errors.OpExecError), err:
257
      fail = True
258
      job.data.op_result[idx] = str(err)
259
      job.data.op_status[idx] = opcodes.Job.STATUS_FAIL
260
  if fail:
261
    job.SetStatus(opcodes.Job.STATUS_FAIL)
262
  else:
263
    job.SetStatus(opcodes.Job.STATUS_SUCCESS)
264

    
265

    
266
def PoolWorker(worker_id, incoming_queue):
267
  """A worker thread function.
268

    
269
  This is the actual processor of a single thread of Job execution.
270

    
271
  Args:
272
    worker_id: the unique id for this worker
273
    incoming_queue: a queue to get jobs from
274

    
275
  """
276
  while True:
277
    print "worker %s sleeping" % worker_id
278
    item = incoming_queue.get(True)
279
    if item is None:
280
      break
281
    print "worker %s processing job %s" % (worker_id, item.data.job_id)
282
    #utils.Lock('cmd')
283
    try:
284
      proc = mcpu.Processor(feedback=lambda x: None)
285
      try:
286
        JobRunner(proc, item)
287
      except errors.GenericError, err:
288
        msg = "ganeti exception %s" % err
289
        item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
290
        print msg
291
    finally:
292
      #utils.Unlock('cmd')
293
      #utils.LockCleanup()
294
      pass
295
    print "worker %s finish job %s" % (worker_id, item.data.job_id)
296
  print "worker %s exiting" % worker_id
297

    
298

    
299
def CheckMaster(debug):
300
  """Checks the node setup.
301

    
302
  If this is the master, the function will return. Otherwise it will
303
  exit with an exit code based on the node status.
304

    
305
  """
306
  try:
307
    ss = ssconf.SimpleStore()
308
    master_name = ss.GetMasterNode()
309
  except errors.ConfigurationError, err:
310
    print "Cluster configuration incomplete: '%s'" % str(err)
311
    sys.exit(EXIT_NODESETUP_ERROR)
312

    
313
  try:
314
    myself = utils.HostInfo()
315
  except errors.ResolverError, err:
316
    sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0])
317
    sys.exit(EXIT_NODESETUP_ERROR)
318

    
319
  if myself.name != master_name:
320
    if debug:
321
      sys.stderr.write("Not master, exiting.\n")
322
    sys.exit(EXIT_NOTMASTER)
323

    
324

    
325
def ParseOptions():
326
  """Parse the command line options.
327

    
328
  Returns:
329
    (options, args) as from OptionParser.parse_args()
330

    
331
  """
332
  parser = OptionParser(description="Ganeti master daemon",
333
                        usage="%prog [-f] [-d]",
334
                        version="%%prog (ganeti) %s" %
335
                        constants.RELEASE_VERSION)
336

    
337
  parser.add_option("-f", "--foreground", dest="fork",
338
                    help="Don't detach from the current terminal",
339
                    default=True, action="store_false")
340
  parser.add_option("-d", "--debug", dest="debug",
341
                    help="Enable some debug messages",
342
                    default=False, action="store_true")
343
  options, args = parser.parse_args()
344
  return options, args
345

    
346

    
347
def main():
348
  """Main function"""
349

    
350
  options, args = ParseOptions()
351
  utils.debug = options.debug
352
  utils.no_fork = True
353

    
354
  CheckMaster(options.debug)
355

    
356
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
357

    
358
  # become a daemon
359
  if options.fork:
360
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
361
                    noclose_fds=[master.fileno()])
362

    
363
  try:
364
    utils.Lock('cmd', debug=options.debug)
365
  except errors.LockError, err:
366
    print >> sys.stderr, str(err)
367
    master.server_cleanup()
368
    return
369

    
370
  try:
371
    master.setup_processors()
372
    try:
373
      master.serve_forever()
374
    finally:
375
      master.server_cleanup()
376
  finally:
377
    utils.Unlock('cmd')
378
    utils.LockCleanup()
379

    
380

    
381
if __name__ == "__main__":
382
  main()