Statistics
| Branch: | Tag: | Revision:

root / lib / rpc / client.py @ 24c09d5e

History | View | Annotate | Download (6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for generic RPC clients.
23

24
"""
25

    
26
import logging
27

    
28
from ganeti import pathutils
29
import ganeti.rpc.transport as t
30

    
31
from ganeti import constants
32
from ganeti import errors
33
from ganeti.rpc.errors import (ProtocolError, RequestError, LuxiError)
34
from ganeti import serializer
35

    
36
KEY_METHOD = constants.LUXI_KEY_METHOD
37
KEY_ARGS = constants.LUXI_KEY_ARGS
38
KEY_SUCCESS = constants.LUXI_KEY_SUCCESS
39
KEY_RESULT = constants.LUXI_KEY_RESULT
40
KEY_VERSION = constants.LUXI_KEY_VERSION
41

    
42

    
43
def ParseRequest(msg):
44
  """Parses a request message.
45

46
  """
47
  try:
48
    request = serializer.LoadJson(msg)
49
  except ValueError, err:
50
    raise ProtocolError("Invalid RPC request (parsing error): %s" % err)
51

    
52
  logging.debug("RPC request: %s", request)
53

    
54
  if not isinstance(request, dict):
55
    logging.error("RPC request not a dict: %r", msg)
56
    raise ProtocolError("Invalid RPC request (not a dict)")
57

    
58
  method = request.get(KEY_METHOD, None) # pylint: disable=E1103
59
  args = request.get(KEY_ARGS, None) # pylint: disable=E1103
60
  version = request.get(KEY_VERSION, None) # pylint: disable=E1103
61

    
62
  if method is None or args is None:
63
    logging.error("RPC request missing method or arguments: %r", msg)
64
    raise ProtocolError(("Invalid RPC request (no method or arguments"
65
                         " in request): %r") % msg)
66

    
67
  return (method, args, version)
68

    
69

    
70
def ParseResponse(msg):
71
  """Parses a response message.
72

73
  """
74
  # Parse the result
75
  try:
76
    data = serializer.LoadJson(msg)
77
  except KeyboardInterrupt:
78
    raise
79
  except Exception, err:
80
    raise ProtocolError("Error while deserializing response: %s" % str(err))
81

    
82
  # Validate response
83
  if not (isinstance(data, dict) and
84
          KEY_SUCCESS in data and
85
          KEY_RESULT in data):
86
    raise ProtocolError("Invalid response from server: %r" % data)
87

    
88
  return (data[KEY_SUCCESS], data[KEY_RESULT],
89
          data.get(KEY_VERSION, None)) # pylint: disable=E1103
90

    
91

    
92
def FormatResponse(success, result, version=None):
93
  """Formats a response message.
94

95
  """
96
  response = {
97
    KEY_SUCCESS: success,
98
    KEY_RESULT: result,
99
    }
100

    
101
  if version is not None:
102
    response[KEY_VERSION] = version
103

    
104
  logging.debug("RPC response: %s", response)
105

    
106
  return serializer.DumpJson(response)
107

    
108

    
109
def FormatRequest(method, args, version=None):
110
  """Formats a request message.
111

112
  """
113
  # Build request
114
  request = {
115
    KEY_METHOD: method,
116
    KEY_ARGS: args,
117
    }
118

    
119
  if version is not None:
120
    request[KEY_VERSION] = version
121

    
122
  # Serialize the request
123
  return serializer.DumpJson(request)
124

    
125

    
126
def CallRPCMethod(transport_cb, method, args, version=None):
127
  """Send a RPC request via a transport and return the response.
128

129
  """
130
  assert callable(transport_cb)
131

    
132
  request_msg = FormatRequest(method, args, version=version)
133

    
134
  # Send request and wait for response
135
  response_msg = transport_cb(request_msg)
136

    
137
  (success, result, resp_version) = ParseResponse(response_msg)
138

    
139
  # Verify version if there was one in the response
140
  if resp_version is not None and resp_version != version:
141
    raise LuxiError("RPC version mismatch, client %s, response %s" %
142
                    (version, resp_version))
143

    
144
  if success:
145
    return result
146

    
147
  errors.MaybeRaise(result)
148
  raise RequestError(result)
149

    
150

    
151
class AbstractClient(object):
152
  """High-level client abstraction.
153

154
  This uses a backing Transport-like class on top of which it
155
  implements data serialization/deserialization.
156

157
  """
158

    
159
  def __init__(self, address=None, timeouts=None,
160
               transport=t.Transport):
161
    """Constructor for the Client class.
162

163
    Arguments:
164
      - address: a valid address the the used transport class
165
      - timeout: a list of timeouts, to be used on connect and read/write
166
      - transport: a Transport-like class
167

168

169
    If timeout is not passed, the default timeouts of the transport
170
    class are used.
171

172
    """
173
    if address is None:
174
      address = pathutils.MASTER_SOCKET
175
    self.address = address
176
    self.timeouts = timeouts
177
    self.transport_class = transport
178
    self.transport = None
179
    self._InitTransport()
180

    
181
  def _InitTransport(self):
182
    """(Re)initialize the transport if needed.
183

184
    """
185
    if self.transport is None:
186
      self.transport = self.transport_class(self.address,
187
                                            timeouts=self.timeouts)
188

    
189
  def _CloseTransport(self):
190
    """Close the transport, ignoring errors.
191

192
    """
193
    if self.transport is None:
194
      return
195
    try:
196
      old_transp = self.transport
197
      self.transport = None
198
      old_transp.Close()
199
    except Exception: # pylint: disable=W0703
200
      pass
201

    
202
  def _SendMethodCall(self, data):
203
    # Send request and wait for response
204
    try:
205
      self._InitTransport()
206
      return self.transport.Call(data)
207
    except Exception:
208
      self._CloseTransport()
209
      raise
210

    
211
  def Close(self):
212
    """Close the underlying connection.
213

214
    """
215
    self._CloseTransport()
216

    
217
  def close(self):
218
    """Same as L{Close}, to be used with contextlib.closing(...).
219

220
    """
221
    self.Close()
222

    
223
  def CallMethod(self, method, args):
224
    """Send a generic request and return the response.
225

226
    """
227
    if not isinstance(args, (list, tuple)):
228
      raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
229
                                   " expected list, got %s" % type(args))
230
    return CallRPCMethod(self._SendMethodCall, method, args,
231
                         version=constants.LUXI_VERSION)