Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ c2a03789

History | View | Annotate | Download (6.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocl. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also be used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import simplejson
35
import time
36

    
37
from ganeti import opcodes
38

    
39

    
40
KEY_REQUEST = 'request'
41
KEY_DATA = 'data'
42
REQ_SUBMIT = 'submit'
43
REQ_ABORT = 'abort'
44
REQ_QUERY = 'query'
45

    
46
DEF_CTMO = 10
47
DEF_RWTO = 60
48

    
49

    
50
class ProtocolError(Exception):
51
  """Denotes an error in the server communication"""
52

    
53

    
54
class ConnectionClosedError(ProtocolError):
55
  """Connection closed error"""
56

    
57

    
58
class TimeoutError(ProtocolError):
59
  """Operation timeout error"""
60

    
61

    
62
class EncodingError(ProtocolError):
63
  """Encoding failure on the sending side"""
64

    
65

    
66
class DecodingError(ProtocolError):
67
  """Decoding failure on the receiving side"""
68

    
69

    
70
def SerializeJob(job):
71
  """Convert a job description to a string format.
72

73
  """
74
  return simplejson.dumps(job.__getstate__())
75

    
76

    
77
def UnserializeJob(data):
78
  """Load a job from a string format"""
79
  try:
80
    new_data = simplejson.loads(data)
81
  except Exception, err:
82
    raise DecodingError("Error while unserializing: %s" % str(err))
83
  job = opcodes.Job()
84
  job.__setstate__(new_data)
85
  return job
86

    
87

    
88
class Transport:
89
  """Low-level transport class.
90

91
  This is used on the client side.
92

93
  This could be replace by any other class that provides the same
94
  semantics to the Client. This means:
95
    - can send messages and receive messages
96
    - safe for multithreading
97

98
  """
99

    
100
  def __init__(self, address, timeouts=None, eom=None):
101
    """Constructor for the Client class.
102

103
    Arguments:
104
      - address: a valid address the the used transport class
105
      - timeout: a list of timeouts, to be used on connect and read/write
106
      - eom: an identifier to be used as end-of-message which the
107
        upper-layer will guarantee that this identifier will not appear
108
        in any message
109

110
    There are two timeouts used since we might want to wait for a long
111
    time for a response, but the connect timeout should be lower.
112

113
    If not passed, we use a default of 10 and respectively 60 seconds.
114

115
    Note that on reading data, since the timeout applies to an
116
    invidual receive, it might be that the total duration is longer
117
    than timeout value passed (we make a hard limit at twice the read
118
    timeout).
119

120
    """
121
    self.address = address
122
    if timeouts is None:
123
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
124
    else:
125
      self._ctimeout, self._rwtimeout = timeouts
126

    
127
    self.socket = None
128
    self._buffer = ""
129
    self._msgs = collections.deque()
130

    
131
    if eom is None:
132
      self.eom = '\3'
133
    else:
134
      self.eom = eom
135

    
136
    try:
137
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
138
      self.socket.settimeout(self._ctimeout)
139
      try:
140
        self.socket.connect(address)
141
      except socket.timeout, err:
142
        raise TimeoutError("Connection timed out: %s" % str(err))
143
      self.socket.settimeout(self._rwtimeout)
144
    except socket.error:
145
      if self.socket is not None:
146
        self.socket.close()
147
      self.socket = None
148
      raise
149

    
150
  def _CheckSocket(self):
151
    """Make sure we are connected.
152

153
    """
154
    if self.socket is None:
155
      raise ProtocolError("Connection is closed")
156

    
157
  def Send(self, msg):
158
    """Send a message.
159

160
    This just sends a message and doesn't wait for the response.
161

162
    """
163
    if self.eom in msg:
164
      raise EncodingError("Message terminator found in payload")
165
    self._CheckSocket()
166
    try:
167
      self.socket.sendall(msg + self.eom)
168
    except socket.timeout, err:
169
      raise TimeoutError("Sending timeout: %s" % str(err))
170

    
171
  def Recv(self):
172
    """Try to receive a messae from the socket.
173

174
    In case we already have messages queued, we just return from the
175
    queue. Otherwise, we try to read data with a _rwtimeout network
176
    timeout, and making sure we don't go over 2x_rwtimeout as a global
177
    limit.
178

179
    """
180
    self._CheckSocket()
181
    etime = time.time() + self._rwtimeout
182
    while not self._msgs:
183
      if time.time() > etime:
184
        raise TimeoutError("Extended receive timeout")
185
      try:
186
        data = self.socket.recv(4096)
187
      except socket.timeout, err:
188
        raise TimeoutError("Receive timeout: %s" % str(err))
189
      if not data:
190
        raise ConnectionClosedError("Connection closed while reading")
191
      new_msgs = (self._buffer + data).split(self.eom)
192
      self._buffer = new_msgs.pop()
193
      self._msgs.extend(new_msgs)
194
    return self._msgs.popleft()
195

    
196
  def Call(self, msg):
197
    """Send a message and wait for the response.
198

199
    This is just a wrapper over Send and Recv.
200

201
    """
202
    self.Send(msg)
203
    return self.Recv()
204

    
205
  def Close(self):
206
    """Close the socket"""
207
    if self.socket is not None:
208
      self.socket.close()
209
      self.socket = None
210

    
211

    
212
class Client(object):
213
  """High-level client implementation.
214

215
  This uses a backing Transport-like class on top of which it
216
  implements data serialization/deserialization.
217

218
  """
219
  def __init__(self, address, timeouts=None, transport=Transport):
220
    """Constructor for the Client class.
221

222
    Arguments:
223
      - address: a valid address the the used transport class
224
      - timeout: a list of timeouts, to be used on connect and read/write
225
      - transport: a Transport-like class
226

227

228
    If timeout is not passed, the default timeouts of the transport
229
    class are used.
230

231
    """
232
    self.transport = transport(address, timeouts=timeouts)
233

    
234
  def SendRequest(self, request, data):
235
    """Send a generic request and return the response.
236

237
    """
238
    msg = {KEY_REQUEST: request, KEY_DATA: data}
239
    result = self.transport.Call(simplejson.dumps(msg))
240
    try:
241
      data = simplejson.loads(result)
242
    except Exception, err:
243
      raise ProtocolError("Error while deserializing response: %s" % str(err))
244
    return data
245

    
246
  def SubmitJob(self, job):
247
    """Submit a job"""
248
    return self.SendRequest(REQ_SUBMIT, SerializeJob(job))
249

    
250
  def Query(self, data):
251
    """Make a query"""
252
    return self.SendRequest(REQ_QUERY, data)