Statistics
| Branch: | Tag: | Revision:

root / lib / rpc / client.py @ b3cc1646

History | View | Annotate | Download (6.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for generic RPC clients.
23

24
"""
25

    
26
import logging
27

    
28
from ganeti import pathutils
29
import ganeti.rpc.transport as t
30

    
31
from ganeti import constants
32
from ganeti import errors
33
from ganeti.rpc.errors import (ProtocolError, RequestError, LuxiError)
34
from ganeti import serializer
35

    
36
KEY_METHOD = constants.LUXI_KEY_METHOD
37
KEY_ARGS = constants.LUXI_KEY_ARGS
38
KEY_SUCCESS = constants.LUXI_KEY_SUCCESS
39
KEY_RESULT = constants.LUXI_KEY_RESULT
40
KEY_VERSION = constants.LUXI_KEY_VERSION
41

    
42

    
43
def ParseRequest(msg):
44
  """Parses a request message.
45

46
  """
47
  try:
48
    request = serializer.LoadJson(msg)
49
  except ValueError, err:
50
    raise ProtocolError("Invalid RPC request (parsing error): %s" % err)
51

    
52
  logging.debug("RPC request: %s", request)
53

    
54
  if not isinstance(request, dict):
55
    logging.error("RPC request not a dict: %r", msg)
56
    raise ProtocolError("Invalid RPC request (not a dict)")
57

    
58
  method = request.get(KEY_METHOD, None) # pylint: disable=E1103
59
  args = request.get(KEY_ARGS, None) # pylint: disable=E1103
60
  version = request.get(KEY_VERSION, None) # pylint: disable=E1103
61

    
62
  if method is None or args is None:
63
    logging.error("RPC request missing method or arguments: %r", msg)
64
    raise ProtocolError(("Invalid RPC request (no method or arguments"
65
                         " in request): %r") % msg)
66

    
67
  return (method, args, version)
68

    
69

    
70
def ParseResponse(msg):
71
  """Parses a response message.
72

73
  """
74
  # Parse the result
75
  try:
76
    data = serializer.LoadJson(msg)
77
  except KeyboardInterrupt:
78
    raise
79
  except Exception, err:
80
    raise ProtocolError("Error while deserializing response: %s" % str(err))
81

    
82
  # Validate response
83
  if not (isinstance(data, dict) and
84
          KEY_SUCCESS in data and
85
          KEY_RESULT in data):
86
    raise ProtocolError("Invalid response from server: %r" % data)
87

    
88
  return (data[KEY_SUCCESS], data[KEY_RESULT],
89
          data.get(KEY_VERSION, None)) # pylint: disable=E1103
90

    
91

    
92
def FormatResponse(success, result, version=None):
93
  """Formats a response message.
94

95
  """
96
  response = {
97
    KEY_SUCCESS: success,
98
    KEY_RESULT: result,
99
    }
100

    
101
  if version is not None:
102
    response[KEY_VERSION] = version
103

    
104
  logging.debug("RPC response: %s", response)
105

    
106
  return serializer.DumpJson(response)
107

    
108

    
109
def FormatRequest(method, args, version=None):
110
  """Formats a request message.
111

112
  """
113
  # Build request
114
  request = {
115
    KEY_METHOD: method,
116
    KEY_ARGS: args,
117
    }
118

    
119
  if version is not None:
120
    request[KEY_VERSION] = version
121

    
122
  # Serialize the request
123
  return serializer.DumpJson(request)
124

    
125

    
126
def CallRPCMethod(transport_cb, method, args, version=None):
127
  """Send a RPC request via a transport and return the response.
128

129
  """
130
  assert callable(transport_cb)
131

    
132
  request_msg = FormatRequest(method, args, version=version)
133

    
134
  # Send request and wait for response
135
  response_msg = transport_cb(request_msg)
136

    
137
  (success, result, resp_version) = ParseResponse(response_msg)
138

    
139
  # Verify version if there was one in the response
140
  if resp_version is not None and resp_version != version:
141
    raise LuxiError("RPC version mismatch, client %s, response %s" %
142
                    (version, resp_version))
143

    
144
  if success:
145
    return result
146

    
147
  errors.MaybeRaise(result)
148
  raise RequestError(result)
149

    
150

    
151
class AbstractClient(object):
152
  """High-level client abstraction.
153

154
  This uses a backing Transport-like class on top of which it
155
  implements data serialization/deserialization.
156

157
  """
158

    
159
  def __init__(self, address=None, timeouts=None,
160
               transport=t.Transport):
161
    """Constructor for the Client class.
162

163
    Arguments:
164
      - address: a valid address the the used transport class
165
      - timeout: a list of timeouts, to be used on connect and read/write
166
      - transport: a Transport-like class
167

168

169
    If timeout is not passed, the default timeouts of the transport
170
    class are used.
171

172
    """
173
    if address is None:
174
      address = pathutils.MASTER_SOCKET
175
    self.address = address
176
    self.timeouts = timeouts
177
    self.transport_class = transport
178
    self.transport = None
179
    self._InitTransport()
180
    # The version used in RPC communication, by default unused:
181
    self.version = None
182

    
183
  def _InitTransport(self):
184
    """(Re)initialize the transport if needed.
185

186
    """
187
    if self.transport is None:
188
      self.transport = self.transport_class(self.address,
189
                                            timeouts=self.timeouts)
190

    
191
  def _CloseTransport(self):
192
    """Close the transport, ignoring errors.
193

194
    """
195
    if self.transport is None:
196
      return
197
    try:
198
      old_transp = self.transport
199
      self.transport = None
200
      old_transp.Close()
201
    except Exception: # pylint: disable=W0703
202
      pass
203

    
204
  def _SendMethodCall(self, data):
205
    # Send request and wait for response
206
    try:
207
      self._InitTransport()
208
      return self.transport.Call(data)
209
    except Exception:
210
      self._CloseTransport()
211
      raise
212

    
213
  def Close(self):
214
    """Close the underlying connection.
215

216
    """
217
    self._CloseTransport()
218

    
219
  def close(self):
220
    """Same as L{Close}, to be used with contextlib.closing(...).
221

222
    """
223
    self.Close()
224

    
225
  def CallMethod(self, method, args):
226
    """Send a generic request and return the response.
227

228
    """
229
    if not isinstance(args, (list, tuple)):
230
      raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
231
                                   " expected list, got %s" % type(args))
232
    return CallRPCMethod(self._SendMethodCall, method, args,
233
                         version=self.version)