Statistics
| Branch: | Tag: | Revision:

root / lib / rpc / transport.py @ f3aebf6f

History | View | Annotate | Download (6.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module that defines a transport for RPC connections.
23

24
A transport can send to and receive messages from some endpoint.
25

26
"""
27

    
28
import collections
29
import errno
30
import logging
31
import socket
32
import time
33

    
34
from ganeti import constants
35
from ganeti import utils
36
from ganeti.rpc import errors
37

    
38

    
39
DEF_CTMO = constants.LUXI_DEF_CTMO
40
DEF_RWTO = constants.LUXI_DEF_RWTO
41

    
42

    
43
class Transport:
44
  """Low-level transport class.
45

46
  This is used on the client side.
47

48
  This could be replace by any other class that provides the same
49
  semantics to the Client. This means:
50
    - can send messages and receive messages
51
    - safe for multithreading
52

53
  """
54

    
55
  def __init__(self, address, timeouts=None):
56
    """Constructor for the Client class.
57

58
    Arguments:
59
      - address: a valid address the the used transport class
60
      - timeout: a list of timeouts, to be used on connect and read/write
61

62
    There are two timeouts used since we might want to wait for a long
63
    time for a response, but the connect timeout should be lower.
64

65
    If not passed, we use a default of 10 and respectively 60 seconds.
66

67
    Note that on reading data, since the timeout applies to an
68
    invidual receive, it might be that the total duration is longer
69
    than timeout value passed (we make a hard limit at twice the read
70
    timeout).
71

72
    """
73
    self.address = address
74
    if timeouts is None:
75
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
76
    else:
77
      self._ctimeout, self._rwtimeout = timeouts
78

    
79
    self.socket = None
80
    self._buffer = ""
81
    self._msgs = collections.deque()
82

    
83
    try:
84
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
85

    
86
      # Try to connect
87
      try:
88
        utils.Retry(self._Connect, 1.0, self._ctimeout,
89
                    args=(self.socket, address, self._ctimeout))
90
      except utils.RetryTimeout:
91
        raise errors.TimeoutError("Connect timed out")
92

    
93
      self.socket.settimeout(self._rwtimeout)
94
    except (socket.error, errors.NoMasterError):
95
      if self.socket is not None:
96
        self.socket.close()
97
      self.socket = None
98
      raise
99

    
100
  @staticmethod
101
  def _Connect(sock, address, timeout):
102
    sock.settimeout(timeout)
103
    try:
104
      sock.connect(address)
105
    except socket.timeout, err:
106
      raise errors.TimeoutError("Connect timed out: %s" % str(err))
107
    except socket.error, err:
108
      error_code = err.args[0]
109
      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
110
        raise errors.NoMasterError(address)
111
      elif error_code in (errno.EPERM, errno.EACCES):
112
        raise errors.PermissionError(address)
113
      elif error_code == errno.EAGAIN:
114
        # Server's socket backlog is full at the moment
115
        raise utils.RetryAgain()
116
      raise
117

    
118
  def _CheckSocket(self):
119
    """Make sure we are connected.
120

121
    """
122
    if self.socket is None:
123
      raise errors.ProtocolError("Connection is closed")
124

    
125
  def Send(self, msg):
126
    """Send a message.
127

128
    This just sends a message and doesn't wait for the response.
129

130
    """
131
    if constants.LUXI_EOM in msg:
132
      raise errors.ProtocolError("Message terminator found in payload")
133

    
134
    self._CheckSocket()
135
    try:
136
      # TODO: sendall is not guaranteed to send everything
137
      self.socket.sendall(msg + constants.LUXI_EOM)
138
    except socket.timeout, err:
139
      raise errors.TimeoutError("Sending timeout: %s" % str(err))
140

    
141
  def Recv(self):
142
    """Try to receive a message from the socket.
143

144
    In case we already have messages queued, we just return from the
145
    queue. Otherwise, we try to read data with a _rwtimeout network
146
    timeout, and making sure we don't go over 2x_rwtimeout as a global
147
    limit.
148

149
    """
150
    self._CheckSocket()
151
    etime = time.time() + self._rwtimeout
152
    while not self._msgs:
153
      if time.time() > etime:
154
        raise errors.TimeoutError("Extended receive timeout")
155
      while True:
156
        try:
157
          data = self.socket.recv(4096)
158
        except socket.timeout, err:
159
          raise errors.TimeoutError("Receive timeout: %s" % str(err))
160
        except socket.error, err:
161
          if err.args and err.args[0] == errno.EAGAIN:
162
            continue
163
          raise
164
        break
165
      if not data:
166
        raise errors.ConnectionClosedError("Connection closed while reading")
167
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
168
      self._buffer = new_msgs.pop()
169
      self._msgs.extend(new_msgs)
170
    return self._msgs.popleft()
171

    
172
  def Call(self, msg):
173
    """Send a message and wait for the response.
174

175
    This is just a wrapper over Send and Recv.
176

177
    """
178
    self.Send(msg)
179
    return self.Recv()
180

    
181
  @staticmethod
182
  def RetryOnBrokenPipe(fn, on_error):
183
    """Calls a given function, retrying if it fails on the 'Broken pipe' IO
184
    exception.
185

186
    This allows to re-establish a broken connection and retry an IO operation.
187

188
    The function receives one an integer argument stating the current retry
189
    number, 0 being the first call, 1 being the retry.
190

191
    If any exception occurs, on_error is invoked first with the exception given
192
    as an argument. Then, if the exception is 'Broken pipe', the function call
193
    is retried once more.
194

195
    """
196
    retries = 2
197
    for try_no in range(0, retries):
198
      try:
199
        return fn(try_no)
200
      except socket.error, ex:
201
        on_error(ex)
202
        # we retry on "Broken pipe", unless it's the last try
203
        if try_no == retries - 1:
204
          raise
205
        elif not (isinstance(ex.args, tuple) and (ex[0] == errno.EPIPE)):
206
          raise
207
      except Exception, ex:
208
        on_error(ex)
209
        raise
210
    assert False # we should never get here
211

    
212
  def Close(self):
213
    """Close the socket"""
214
    if self.socket is not None:
215
      self.socket.close()
216
      self.socket = None