Statistics
| Branch: | Tag: | Revision:

root / lib / rpc / transport.py @ 51b69fc8

History | View | Annotate | Download (6.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module that defines a transport for RPC connections.
23

24
A transport can send to and receive messages from some endpoint.
25

26
"""
27

    
28
import collections
29
import errno
30
import socket
31
import time
32

    
33
from ganeti import constants
34
from ganeti import utils
35
from ganeti.rpc import errors
36

    
37

    
38
DEF_CTMO = constants.LUXI_DEF_CTMO
39
DEF_RWTO = constants.LUXI_DEF_RWTO
40

    
41

    
42
class Transport:
43
  """Low-level transport class.
44

45
  This is used on the client side.
46

47
  This could be replace by any other class that provides the same
48
  semantics to the Client. This means:
49
    - can send messages and receive messages
50
    - safe for multithreading
51

52
  """
53

    
54
  def __init__(self, address, timeouts=None):
55
    """Constructor for the Client class.
56

57
    Arguments:
58
      - address: a valid address the the used transport class
59
      - timeout: a list of timeouts, to be used on connect and read/write
60

61
    There are two timeouts used since we might want to wait for a long
62
    time for a response, but the connect timeout should be lower.
63

64
    If not passed, we use a default of 10 and respectively 60 seconds.
65

66
    Note that on reading data, since the timeout applies to an
67
    invidual receive, it might be that the total duration is longer
68
    than timeout value passed (we make a hard limit at twice the read
69
    timeout).
70

71
    """
72
    self.address = address
73
    if timeouts is None:
74
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
75
    else:
76
      self._ctimeout, self._rwtimeout = timeouts
77

    
78
    self.socket = None
79
    self._buffer = ""
80
    self._msgs = collections.deque()
81

    
82
    try:
83
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
84

    
85
      # Try to connect
86
      try:
87
        utils.Retry(self._Connect, 1.0, self._ctimeout,
88
                    args=(self.socket, address, self._ctimeout))
89
      except utils.RetryTimeout:
90
        raise errors.TimeoutError("Connect timed out")
91

    
92
      self.socket.settimeout(self._rwtimeout)
93
    except (socket.error, errors.NoMasterError):
94
      if self.socket is not None:
95
        self.socket.close()
96
      self.socket = None
97
      raise
98

    
99
  @staticmethod
100
  def _Connect(sock, address, timeout):
101
    sock.settimeout(timeout)
102
    try:
103
      sock.connect(address)
104
    except socket.timeout, err:
105
      raise errors.TimeoutError("Connect timed out: %s" % str(err))
106
    except socket.error, err:
107
      error_code = err.args[0]
108
      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
109
        raise errors.NoMasterError(address)
110
      elif error_code in (errno.EPERM, errno.EACCES):
111
        raise errors.PermissionError(address)
112
      elif error_code == errno.EAGAIN:
113
        # Server's socket backlog is full at the moment
114
        raise utils.RetryAgain()
115
      raise
116

    
117
  def _CheckSocket(self):
118
    """Make sure we are connected.
119

120
    """
121
    if self.socket is None:
122
      raise errors.ProtocolError("Connection is closed")
123

    
124
  def Send(self, msg):
125
    """Send a message.
126

127
    This just sends a message and doesn't wait for the response.
128

129
    """
130
    if constants.LUXI_EOM in msg:
131
      raise errors.ProtocolError("Message terminator found in payload")
132

    
133
    self._CheckSocket()
134
    try:
135
      # TODO: sendall is not guaranteed to send everything
136
      self.socket.sendall(msg + constants.LUXI_EOM)
137
    except socket.timeout, err:
138
      raise errors.TimeoutError("Sending timeout: %s" % str(err))
139

    
140
  def Recv(self):
141
    """Try to receive a message from the socket.
142

143
    In case we already have messages queued, we just return from the
144
    queue. Otherwise, we try to read data with a _rwtimeout network
145
    timeout, and making sure we don't go over 2x_rwtimeout as a global
146
    limit.
147

148
    """
149
    self._CheckSocket()
150
    etime = time.time() + self._rwtimeout
151
    while not self._msgs:
152
      if time.time() > etime:
153
        raise errors.TimeoutError("Extended receive timeout")
154
      while True:
155
        try:
156
          data = self.socket.recv(4096)
157
        except socket.timeout, err:
158
          raise errors.TimeoutError("Receive timeout: %s" % str(err))
159
        except socket.error, err:
160
          if err.args and err.args[0] == errno.EAGAIN:
161
            continue
162
          raise
163
        break
164
      if not data:
165
        raise errors.ConnectionClosedError("Connection closed while reading")
166
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
167
      self._buffer = new_msgs.pop()
168
      self._msgs.extend(new_msgs)
169
    return self._msgs.popleft()
170

    
171
  def Call(self, msg):
172
    """Send a message and wait for the response.
173

174
    This is just a wrapper over Send and Recv.
175

176
    """
177
    self.Send(msg)
178
    return self.Recv()
179

    
180
  @staticmethod
181
  def RetryOnBrokenPipe(fn, on_error):
182
    """Calls a given function, retrying if it fails on the 'Broken pipe' IO
183
    exception.
184

185
    This allows to re-establish a broken connection and retry an IO operation.
186

187
    The function receives one an integer argument stating the current retry
188
    number, 0 being the first call, 1 being the retry.
189

190
    If any exception occurs, on_error is invoked first with the exception given
191
    as an argument. Then, if the exception is 'Broken pipe', the function call
192
    is retried once more.
193

194
    """
195
    retries = 2
196
    for try_no in range(0, retries):
197
      try:
198
        return fn(try_no)
199
      except socket.error, ex:
200
        on_error(ex)
201
        # we retry on "Broken pipe", unless it's the last try
202
        if try_no == retries - 1:
203
          raise
204
        elif not (isinstance(ex.args, tuple) and (ex[0] == errno.EPIPE)):
205
          raise
206
      except Exception, ex:
207
        on_error(ex)
208
        raise
209
    assert False # we should never get here
210

    
211
  def Close(self):
212
    """Close the socket"""
213
    if self.socket is not None:
214
      self.socket.close()
215
      self.socket = None