Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ ffeffa1d

History | View | Annotate | Download (6.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import SocketServer
31
import threading
32
import time
33
import collections
34
import Queue
35
import random
36
import signal
37
import simplejson
38

    
39

    
40
from cStringIO import StringIO
41

    
42
from ganeti import constants
43
from ganeti import mcpu
44
from ganeti import opcodes
45
from ganeti import jqueue
46
from ganeti import luxi
47
from ganeti import utils
48

    
49

    
50
class IOServer(SocketServer.UnixStreamServer):
51
  """IO thread class.
52

    
53
  This class takes care of initializing the other threads, setting
54
  signal handlers (which are processed only in this thread), and doing
55
  cleanup at shutdown.
56

    
57
  """
58
  QUEUE_PROCESSOR_SIZE = 1
59

    
60
  def __init__(self, address, rqhandler):
61
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
62
    self.do_quit = False
63
    self.queue = jqueue.QueueManager()
64
    self.processors = []
65
    for i in range(self.QUEUE_PROCESSOR_SIZE):
66
      self.processors.append(threading.Thread(target=PoolWorker,
67
                                              args=(i, self.queue.new_queue)))
68
    for t in self.processors:
69
      t.start()
70
    signal.signal(signal.SIGINT, self.handle_sigint)
71

    
72
  def process_request_thread(self, request, client_address):
73
    """Process the request.
74

    
75
    This is copied from the code in ThreadingMixIn.
76

    
77
    """
78
    try:
79
      self.finish_request(request, client_address)
80
      self.close_request(request)
81
    except:
82
      self.handle_error(request, client_address)
83
      self.close_request(request)
84

    
85
  def process_request(self, request, client_address):
86
    """Start a new thread to process the request.
87

    
88
    This is copied from the coode in ThreadingMixIn.
89

    
90
    """
91
    t = threading.Thread(target=self.process_request_thread,
92
                         args=(request, client_address))
93
    t.start()
94

    
95
  def handle_sigint(self, signum, frame):
96
    print "received %s in %s" % (signum, frame)
97
    self.do_quit = True
98
    self.server_close()
99
    for i in range(self.QUEUE_PROCESSOR_SIZE):
100
      self.queue.new_queue.put(None)
101

    
102
  def serve_forever(self):
103
    """Handle one request at a time until told to quit."""
104
    while not self.do_quit:
105
      self.handle_request()
106

    
107

    
108
class ClientRqHandler(SocketServer.BaseRequestHandler):
109
  """Client handler"""
110
  EOM = '\3'
111
  READ_SIZE = 4096
112

    
113
  def setup(self):
114
    self._buffer = ""
115
    self._msgs = collections.deque()
116
    self._ops = ClientOps(self.server)
117

    
118
  def handle(self):
119
    while True:
120
      msg = self.read_message()
121
      if msg is None:
122
        print "client closed connection"
123
        break
124
      request = simplejson.loads(msg)
125
      if not isinstance(request, dict):
126
        print "wrong request received: %s" % msg
127
        break
128
      method = request.get('request', None)
129
      data = request.get('data', None)
130
      if method is None or data is None:
131
        print "no method or data in request"
132
        break
133
      print "request:", method, data
134
      result = self._ops.handle_request(method, data)
135
      print "result:", result
136
      self.send_message(simplejson.dumps({'success': True, 'result': result}))
137

    
138
  def read_message(self):
139
    while not self._msgs:
140
      data = self.request.recv(self.READ_SIZE)
141
      if not data:
142
        return None
143
      new_msgs = (self._buffer + data).split(self.EOM)
144
      self._buffer = new_msgs.pop()
145
      self._msgs.extend(new_msgs)
146
    return self._msgs.popleft()
147

    
148
  def send_message(self, msg):
149
    #print "sending", msg
150
    self.request.sendall(msg + self.EOM)
151

    
152

    
153
class ClientOps:
154
  """Class holding high-level client operations."""
155
  def __init__(self, server):
156
    self.server = server
157
    self._cpu = None
158

    
159
  def _getcpu(self):
160
    if self._cpu is None:
161
      self._cpu = mcpu.Processor(lambda x: None)
162
    return self._cpu
163

    
164
  def handle_request(self, operation, args):
165
    print operation, args
166
    if operation == "submit":
167
      return self.put(args)
168
    elif operation == "query":
169
      path = args["object"]
170
      if path == "instances":
171
        return self.query(args)
172
    else:
173
      raise ValueError("Invalid operation")
174

    
175
  def put(self, args):
176
    job = luxi.UnserializeJob(args)
177
    rid = self.server.queue.put(job)
178
    return rid
179

    
180
  def query(self, args):
181
    path = args["object"]
182
    fields = args["fields"]
183
    names = args["names"]
184
    if path == "instances":
185
      opclass = opcodes.OpQueryInstances
186
    else:
187
      raise ValueError("Invalid object %s" % path)
188

    
189
    op = opclass(output_fields = fields, names=names)
190
    cpu = self._getcpu()
191
    result = cpu.ExecOpCode(op)
192
    return result
193

    
194
  def query_job(self, rid):
195
    rid = int(data)
196
    job = self.server.queue.query(rid)
197
    return job
198

    
199

    
200
def JobRunner(proc, job):
201
  """Job executor.
202

    
203
  This functions processes a single job in the context of given
204
  processor instance.
205

    
206
  """
207
  job.SetStatus(opcodes.Job.STATUS_RUNNING)
208
  for op in job.data.op_list:
209
    proc.ExecOpCode(op)
210
  job.SetStatus(opcodes.Job.STATUS_FINISHED, result=opcodes.Job.RESULT_OK)
211

    
212

    
213
def PoolWorker(worker_id, incoming_queue):
214
  """A worker thread function.
215

    
216
  This is the actual processor of a single thread of Job execution.
217

    
218
  """
219
  while True:
220
    print "worker %s sleeping" % worker_id
221
    item = incoming_queue.get(True)
222
    if item is None:
223
      break
224
    print "worker %s processing job %s" % (worker_id, item.data.job_id)
225
    utils.Lock('cmd')
226
    try:
227
      proc = mcpu.Processor(feedback=lambda x: None)
228
      try:
229
        JobRunner(proc, item)
230
      except errors.GenericError, err:
231
        print "ganeti exception %s" % err
232
    finally:
233
      utils.Unlock('cmd')
234
      utils.LockCleanup()
235
    print "worker %s finish job %s" % (worker_id, item.data.job_id)
236
  print "worker %s exiting" % worker_id
237

    
238

    
239
def main():
240
  """Main function"""
241

    
242
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
243
  master.serve_forever()
244

    
245

    
246
if __name__ == "__main__":
247
  main()