root / lib / impexpd / __init__.py @ 0559f745
History | View | Annotate | Download (14.2 kB)
1 | bb44b1ae | Michael Hanselmann | #
|
---|---|---|---|
2 | bb44b1ae | Michael Hanselmann | #
|
3 | bb44b1ae | Michael Hanselmann | |
4 | bb44b1ae | Michael Hanselmann | # Copyright (C) 2010 Google Inc.
|
5 | bb44b1ae | Michael Hanselmann | #
|
6 | bb44b1ae | Michael Hanselmann | # This program is free software; you can redistribute it and/or modify
|
7 | bb44b1ae | Michael Hanselmann | # it under the terms of the GNU General Public License as published by
|
8 | bb44b1ae | Michael Hanselmann | # the Free Software Foundation; either version 2 of the License, or
|
9 | bb44b1ae | Michael Hanselmann | # (at your option) any later version.
|
10 | bb44b1ae | Michael Hanselmann | #
|
11 | bb44b1ae | Michael Hanselmann | # This program is distributed in the hope that it will be useful, but
|
12 | bb44b1ae | Michael Hanselmann | # WITHOUT ANY WARRANTY; without even the implied warranty of
|
13 | bb44b1ae | Michael Hanselmann | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
14 | bb44b1ae | Michael Hanselmann | # General Public License for more details.
|
15 | bb44b1ae | Michael Hanselmann | #
|
16 | bb44b1ae | Michael Hanselmann | # You should have received a copy of the GNU General Public License
|
17 | bb44b1ae | Michael Hanselmann | # along with this program; if not, write to the Free Software
|
18 | bb44b1ae | Michael Hanselmann | # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
19 | bb44b1ae | Michael Hanselmann | # 02110-1301, USA.
|
20 | bb44b1ae | Michael Hanselmann | |
21 | bb44b1ae | Michael Hanselmann | |
22 | bb44b1ae | Michael Hanselmann | """Classes and functions for import/export daemon.
|
23 | bb44b1ae | Michael Hanselmann |
|
24 | bb44b1ae | Michael Hanselmann | """
|
25 | bb44b1ae | Michael Hanselmann | |
26 | c08d76f5 | Michael Hanselmann | import os |
27 | 34c9ee7b | Michael Hanselmann | import re |
28 | 34c9ee7b | Michael Hanselmann | import socket |
29 | 53dbf14c | Michael Hanselmann | import logging |
30 | c08d76f5 | Michael Hanselmann | import signal |
31 | c08d76f5 | Michael Hanselmann | import errno |
32 | c08d76f5 | Michael Hanselmann | import time |
33 | bb44b1ae | Michael Hanselmann | from cStringIO import StringIO |
34 | bb44b1ae | Michael Hanselmann | |
35 | bb44b1ae | Michael Hanselmann | from ganeti import constants |
36 | bb44b1ae | Michael Hanselmann | from ganeti import errors |
37 | bb44b1ae | Michael Hanselmann | from ganeti import utils |
38 | bb44b1ae | Michael Hanselmann | |
39 | bb44b1ae | Michael Hanselmann | |
40 | 34c9ee7b | Michael Hanselmann | #: Used to recognize point at which socat(1) starts to listen on its socket.
|
41 | 34c9ee7b | Michael Hanselmann | #: The local address is required for the remote peer to connect (in particular
|
42 | 34c9ee7b | Michael Hanselmann | #: the port number).
|
43 | 34c9ee7b | Michael Hanselmann | LISTENING_RE = re.compile(r"^listening on\s+"
|
44 | 34c9ee7b | Michael Hanselmann | r"AF=(?P<family>\d+)\s+"
|
45 | 34c9ee7b | Michael Hanselmann | r"(?P<address>.+):(?P<port>\d+)$", re.I)
|
46 | 34c9ee7b | Michael Hanselmann | |
47 | 34c9ee7b | Michael Hanselmann | #: Used to recognize point at which socat(1) is sending data over the wire
|
48 | 34c9ee7b | Michael Hanselmann | TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
|
49 | 34c9ee7b | Michael Hanselmann | re.I) |
50 | 34c9ee7b | Michael Hanselmann | |
51 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_DEBUG = "D"
|
52 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_INFO = "I"
|
53 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_NOTICE = "N"
|
54 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_WARNING = "W"
|
55 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_ERROR = "E"
|
56 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_FATAL = "F"
|
57 | 34c9ee7b | Michael Hanselmann | |
58 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_IGNORE = frozenset([
|
59 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_DEBUG, |
60 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_INFO, |
61 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_NOTICE, |
62 | 34c9ee7b | Michael Hanselmann | ]) |
63 | 34c9ee7b | Michael Hanselmann | |
64 | c08d76f5 | Michael Hanselmann | #: Used to parse GNU dd(1) statistics
|
65 | c08d76f5 | Michael Hanselmann | DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
|
66 | c08d76f5 | Michael Hanselmann | r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
|
67 | c08d76f5 | Michael Hanselmann | |
68 | c08d76f5 | Michael Hanselmann | #: Used to ignore "N+N records in/out" on dd(1)'s stderr
|
69 | c08d76f5 | Michael Hanselmann | DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
|
70 | c08d76f5 | Michael Hanselmann | |
71 | c08d76f5 | Michael Hanselmann | #: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is
|
72 | c08d76f5 | Michael Hanselmann | #: unavailable and SIGUSR1 is used instead)
|
73 | c08d76f5 | Michael Hanselmann | DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1) |
74 | c08d76f5 | Michael Hanselmann | |
75 | bb44b1ae | Michael Hanselmann | #: Buffer size: at most this many bytes are transferred at once
|
76 | bb44b1ae | Michael Hanselmann | BUFSIZE = 1024 * 1024 |
77 | bb44b1ae | Michael Hanselmann | |
78 | bb44b1ae | Michael Hanselmann | # Common options for socat
|
79 | bb44b1ae | Michael Hanselmann | SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"] |
80 | bb44b1ae | Michael Hanselmann | SOCAT_OPENSSL_OPTS = ["verify=1", "cipher=HIGH", "method=TLSv1"] |
81 | bb44b1ae | Michael Hanselmann | |
82 | 0559f745 | Michael Hanselmann | SOCAT_OPTION_MAXLEN = 400
|
83 | 0559f745 | Michael Hanselmann | |
84 | 34c9ee7b | Michael Hanselmann | (PROG_OTHER, |
85 | c08d76f5 | Michael Hanselmann | PROG_SOCAT, |
86 | c08d76f5 | Michael Hanselmann | PROG_DD, |
87 | f9323011 | Michael Hanselmann | PROG_DD_PID, |
88 | f9323011 | Michael Hanselmann | PROG_EXP_SIZE) = range(1, 6) |
89 | 34c9ee7b | Michael Hanselmann | PROG_ALL = frozenset([
|
90 | 34c9ee7b | Michael Hanselmann | PROG_OTHER, |
91 | 34c9ee7b | Michael Hanselmann | PROG_SOCAT, |
92 | c08d76f5 | Michael Hanselmann | PROG_DD, |
93 | c08d76f5 | Michael Hanselmann | PROG_DD_PID, |
94 | f9323011 | Michael Hanselmann | PROG_EXP_SIZE, |
95 | 34c9ee7b | Michael Hanselmann | ]) |
96 | 34c9ee7b | Michael Hanselmann | |
97 | bb44b1ae | Michael Hanselmann | |
98 | bb44b1ae | Michael Hanselmann | class CommandBuilder(object): |
99 | c08d76f5 | Michael Hanselmann | def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd): |
100 | bb44b1ae | Michael Hanselmann | """Initializes this class.
|
101 | bb44b1ae | Michael Hanselmann |
|
102 | bb44b1ae | Michael Hanselmann | @param mode: Daemon mode (import or export)
|
103 | bb44b1ae | Michael Hanselmann | @param opts: Options object
|
104 | bb44b1ae | Michael Hanselmann | @type socat_stderr_fd: int
|
105 | bb44b1ae | Michael Hanselmann | @param socat_stderr_fd: File descriptor socat should write its stderr to
|
106 | c08d76f5 | Michael Hanselmann | @type dd_stderr_fd: int
|
107 | c08d76f5 | Michael Hanselmann | @param dd_stderr_fd: File descriptor dd should write its stderr to
|
108 | c08d76f5 | Michael Hanselmann | @type dd_pid_fd: int
|
109 | c08d76f5 | Michael Hanselmann | @param dd_pid_fd: File descriptor the child should write dd's PID to
|
110 | bb44b1ae | Michael Hanselmann |
|
111 | bb44b1ae | Michael Hanselmann | """
|
112 | bb44b1ae | Michael Hanselmann | self._opts = opts
|
113 | bb44b1ae | Michael Hanselmann | self._mode = mode
|
114 | bb44b1ae | Michael Hanselmann | self._socat_stderr_fd = socat_stderr_fd
|
115 | c08d76f5 | Michael Hanselmann | self._dd_stderr_fd = dd_stderr_fd
|
116 | c08d76f5 | Michael Hanselmann | self._dd_pid_fd = dd_pid_fd
|
117 | bb44b1ae | Michael Hanselmann | |
118 | bb44b1ae | Michael Hanselmann | @staticmethod
|
119 | bb44b1ae | Michael Hanselmann | def GetBashCommand(cmd): |
120 | bb44b1ae | Michael Hanselmann | """Prepares a command to be run in Bash.
|
121 | bb44b1ae | Michael Hanselmann |
|
122 | bb44b1ae | Michael Hanselmann | """
|
123 | bb44b1ae | Michael Hanselmann | return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd] |
124 | bb44b1ae | Michael Hanselmann | |
125 | bb44b1ae | Michael Hanselmann | def _GetSocatCommand(self): |
126 | bb44b1ae | Michael Hanselmann | """Returns the socat command.
|
127 | bb44b1ae | Michael Hanselmann |
|
128 | bb44b1ae | Michael Hanselmann | """
|
129 | bb44b1ae | Michael Hanselmann | common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [ |
130 | bb44b1ae | Michael Hanselmann | "key=%s" % self._opts.key, |
131 | bb44b1ae | Michael Hanselmann | "cert=%s" % self._opts.cert, |
132 | bb44b1ae | Michael Hanselmann | "cafile=%s" % self._opts.ca, |
133 | bb44b1ae | Michael Hanselmann | ] |
134 | bb44b1ae | Michael Hanselmann | |
135 | bb44b1ae | Michael Hanselmann | if self._opts.bind is not None: |
136 | bb44b1ae | Michael Hanselmann | common_addr_opts.append("bind=%s" % self._opts.bind) |
137 | bb44b1ae | Michael Hanselmann | |
138 | bb44b1ae | Michael Hanselmann | if self._mode == constants.IEM_IMPORT: |
139 | bb44b1ae | Michael Hanselmann | if self._opts.port is None: |
140 | bb44b1ae | Michael Hanselmann | port = 0
|
141 | bb44b1ae | Michael Hanselmann | else:
|
142 | bb44b1ae | Michael Hanselmann | port = self._opts.port
|
143 | bb44b1ae | Michael Hanselmann | |
144 | bb44b1ae | Michael Hanselmann | addr1 = [ |
145 | bb44b1ae | Michael Hanselmann | "OPENSSL-LISTEN:%s" % port,
|
146 | bb44b1ae | Michael Hanselmann | "reuseaddr",
|
147 | bb44b1ae | Michael Hanselmann | |
148 | bb44b1ae | Michael Hanselmann | # Retry to listen if connection wasn't established successfully, up to
|
149 | bb44b1ae | Michael Hanselmann | # 100 times a second. Note that this still leaves room for DoS attacks.
|
150 | bb44b1ae | Michael Hanselmann | "forever",
|
151 | bb44b1ae | Michael Hanselmann | "intervall=0.01",
|
152 | bb44b1ae | Michael Hanselmann | ] + common_addr_opts |
153 | bb44b1ae | Michael Hanselmann | addr2 = ["stdout"]
|
154 | bb44b1ae | Michael Hanselmann | |
155 | bb44b1ae | Michael Hanselmann | elif self._mode == constants.IEM_EXPORT: |
156 | bb44b1ae | Michael Hanselmann | addr1 = ["stdin"]
|
157 | bb44b1ae | Michael Hanselmann | addr2 = [ |
158 | bb44b1ae | Michael Hanselmann | "OPENSSL:%s:%s" % (self._opts.host, self._opts.port), |
159 | bb44b1ae | Michael Hanselmann | |
160 | bb44b1ae | Michael Hanselmann | # How long to wait per connection attempt
|
161 | bb44b1ae | Michael Hanselmann | "connect-timeout=%s" % self._opts.connect_timeout, |
162 | bb44b1ae | Michael Hanselmann | |
163 | bb44b1ae | Michael Hanselmann | # Retry a few times before giving up to connect (once per second)
|
164 | bb44b1ae | Michael Hanselmann | "retry=%s" % self._opts.connect_retries, |
165 | bb44b1ae | Michael Hanselmann | "intervall=1",
|
166 | bb44b1ae | Michael Hanselmann | ] + common_addr_opts |
167 | bb44b1ae | Michael Hanselmann | |
168 | bb44b1ae | Michael Hanselmann | else:
|
169 | bb44b1ae | Michael Hanselmann | raise errors.GenericError("Invalid mode '%s'" % self._mode) |
170 | bb44b1ae | Michael Hanselmann | |
171 | bb44b1ae | Michael Hanselmann | for i in [addr1, addr2]: |
172 | bb44b1ae | Michael Hanselmann | for value in i: |
173 | 0559f745 | Michael Hanselmann | if len(value) > SOCAT_OPTION_MAXLEN: |
174 | 0559f745 | Michael Hanselmann | raise errors.GenericError("Socat option longer than %s" |
175 | 0559f745 | Michael Hanselmann | " characters: %r" %
|
176 | 0559f745 | Michael Hanselmann | (SOCAT_OPTION_MAXLEN, value)) |
177 | bb44b1ae | Michael Hanselmann | if "," in value: |
178 | bb44b1ae | Michael Hanselmann | raise errors.GenericError("Comma not allowed in socat option" |
179 | bb44b1ae | Michael Hanselmann | " value: %r" % value)
|
180 | bb44b1ae | Michael Hanselmann | |
181 | bb44b1ae | Michael Hanselmann | return [
|
182 | bb44b1ae | Michael Hanselmann | constants.SOCAT_PATH, |
183 | bb44b1ae | Michael Hanselmann | |
184 | bb44b1ae | Michael Hanselmann | # Log to stderr
|
185 | bb44b1ae | Michael Hanselmann | "-ls",
|
186 | bb44b1ae | Michael Hanselmann | |
187 | bb44b1ae | Michael Hanselmann | # Log level
|
188 | bb44b1ae | Michael Hanselmann | "-d", "-d", |
189 | bb44b1ae | Michael Hanselmann | |
190 | bb44b1ae | Michael Hanselmann | # Buffer size
|
191 | bb44b1ae | Michael Hanselmann | "-b%s" % BUFSIZE,
|
192 | bb44b1ae | Michael Hanselmann | |
193 | bb44b1ae | Michael Hanselmann | # Unidirectional mode, the first address is only used for reading, and the
|
194 | bb44b1ae | Michael Hanselmann | # second address is only used for writing
|
195 | bb44b1ae | Michael Hanselmann | "-u",
|
196 | bb44b1ae | Michael Hanselmann | |
197 | bb44b1ae | Michael Hanselmann | ",".join(addr1), ",".join(addr2) |
198 | bb44b1ae | Michael Hanselmann | ] |
199 | bb44b1ae | Michael Hanselmann | |
200 | bb44b1ae | Michael Hanselmann | def _GetTransportCommand(self): |
201 | bb44b1ae | Michael Hanselmann | """Returns the command for the transport part of the daemon.
|
202 | bb44b1ae | Michael Hanselmann |
|
203 | bb44b1ae | Michael Hanselmann | """
|
204 | bb44b1ae | Michael Hanselmann | socat_cmd = ("%s 2>&%d" %
|
205 | bb44b1ae | Michael Hanselmann | (utils.ShellQuoteArgs(self._GetSocatCommand()),
|
206 | bb44b1ae | Michael Hanselmann | self._socat_stderr_fd))
|
207 | bb44b1ae | Michael Hanselmann | |
208 | c08d76f5 | Michael Hanselmann | dd_cmd = StringIO() |
209 | c08d76f5 | Michael Hanselmann | # Setting LC_ALL since we want to parse the output and explicitely
|
210 | c08d76f5 | Michael Hanselmann | # redirecting stdin, as the background process (dd) would have /dev/null as
|
211 | c08d76f5 | Michael Hanselmann | # stdin otherwise
|
212 | c08d76f5 | Michael Hanselmann | dd_cmd.write("{ LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
|
213 | c08d76f5 | Michael Hanselmann | (BUFSIZE, self._dd_stderr_fd))
|
214 | c08d76f5 | Michael Hanselmann | # Send PID to daemon
|
215 | c08d76f5 | Michael Hanselmann | dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd) |
216 | c08d76f5 | Michael Hanselmann | # And wait for dd
|
217 | c08d76f5 | Michael Hanselmann | dd_cmd.write(" wait $pid;")
|
218 | c08d76f5 | Michael Hanselmann | dd_cmd.write(" }")
|
219 | c08d76f5 | Michael Hanselmann | |
220 | bb44b1ae | Michael Hanselmann | compr = self._opts.compress
|
221 | bb44b1ae | Michael Hanselmann | |
222 | bb44b1ae | Michael Hanselmann | assert compr in constants.IEC_ALL |
223 | bb44b1ae | Michael Hanselmann | |
224 | bb44b1ae | Michael Hanselmann | if self._mode == constants.IEM_IMPORT: |
225 | bb44b1ae | Michael Hanselmann | if compr == constants.IEC_GZIP:
|
226 | bb44b1ae | Michael Hanselmann | transport_cmd = "%s | gunzip -c" % socat_cmd
|
227 | bb44b1ae | Michael Hanselmann | else:
|
228 | bb44b1ae | Michael Hanselmann | transport_cmd = socat_cmd |
229 | c08d76f5 | Michael Hanselmann | |
230 | c08d76f5 | Michael Hanselmann | transport_cmd += " | %s" % dd_cmd.getvalue()
|
231 | bb44b1ae | Michael Hanselmann | elif self._mode == constants.IEM_EXPORT: |
232 | bb44b1ae | Michael Hanselmann | if compr == constants.IEC_GZIP:
|
233 | bb44b1ae | Michael Hanselmann | transport_cmd = "gzip -c | %s" % socat_cmd
|
234 | bb44b1ae | Michael Hanselmann | else:
|
235 | bb44b1ae | Michael Hanselmann | transport_cmd = socat_cmd |
236 | c08d76f5 | Michael Hanselmann | |
237 | c08d76f5 | Michael Hanselmann | transport_cmd = "%s | %s" % (dd_cmd.getvalue(), transport_cmd)
|
238 | bb44b1ae | Michael Hanselmann | else:
|
239 | bb44b1ae | Michael Hanselmann | raise errors.GenericError("Invalid mode '%s'" % self._mode) |
240 | bb44b1ae | Michael Hanselmann | |
241 | bb44b1ae | Michael Hanselmann | # TODO: Run transport as separate user
|
242 | bb44b1ae | Michael Hanselmann | # The transport uses its own shell to simplify running it as a separate user
|
243 | bb44b1ae | Michael Hanselmann | # in the future.
|
244 | bb44b1ae | Michael Hanselmann | return self.GetBashCommand(transport_cmd) |
245 | bb44b1ae | Michael Hanselmann | |
246 | bb44b1ae | Michael Hanselmann | def GetCommand(self): |
247 | bb44b1ae | Michael Hanselmann | """Returns the complete child process command.
|
248 | bb44b1ae | Michael Hanselmann |
|
249 | bb44b1ae | Michael Hanselmann | """
|
250 | bb44b1ae | Michael Hanselmann | transport_cmd = self._GetTransportCommand()
|
251 | bb44b1ae | Michael Hanselmann | |
252 | bb44b1ae | Michael Hanselmann | buf = StringIO() |
253 | bb44b1ae | Michael Hanselmann | |
254 | bb44b1ae | Michael Hanselmann | if self._opts.cmd_prefix: |
255 | bb44b1ae | Michael Hanselmann | buf.write(self._opts.cmd_prefix)
|
256 | bb44b1ae | Michael Hanselmann | buf.write(" ")
|
257 | bb44b1ae | Michael Hanselmann | |
258 | bb44b1ae | Michael Hanselmann | buf.write(utils.ShellQuoteArgs(transport_cmd)) |
259 | bb44b1ae | Michael Hanselmann | |
260 | bb44b1ae | Michael Hanselmann | if self._opts.cmd_suffix: |
261 | bb44b1ae | Michael Hanselmann | buf.write(" ")
|
262 | bb44b1ae | Michael Hanselmann | buf.write(self._opts.cmd_suffix)
|
263 | bb44b1ae | Michael Hanselmann | |
264 | bb44b1ae | Michael Hanselmann | return self.GetBashCommand(buf.getvalue()) |
265 | 34c9ee7b | Michael Hanselmann | |
266 | 34c9ee7b | Michael Hanselmann | |
267 | 34c9ee7b | Michael Hanselmann | def _VerifyListening(family, address, port): |
268 | 34c9ee7b | Michael Hanselmann | """Verify address given as listening address by socat.
|
269 | 34c9ee7b | Michael Hanselmann |
|
270 | 34c9ee7b | Michael Hanselmann | """
|
271 | 34c9ee7b | Michael Hanselmann | # TODO: Implement IPv6 support
|
272 | 34c9ee7b | Michael Hanselmann | if family != socket.AF_INET:
|
273 | 34c9ee7b | Michael Hanselmann | raise errors.GenericError("Address family %r not supported" % family) |
274 | 34c9ee7b | Michael Hanselmann | |
275 | 34c9ee7b | Michael Hanselmann | try:
|
276 | 34c9ee7b | Michael Hanselmann | packed_address = socket.inet_pton(family, address) |
277 | 34c9ee7b | Michael Hanselmann | except socket.error:
|
278 | 34c9ee7b | Michael Hanselmann | raise errors.GenericError("Invalid address %r for family %s" % |
279 | 34c9ee7b | Michael Hanselmann | (address, family)) |
280 | 34c9ee7b | Michael Hanselmann | |
281 | 34c9ee7b | Michael Hanselmann | return (socket.inet_ntop(family, packed_address), port)
|
282 | 34c9ee7b | Michael Hanselmann | |
283 | 34c9ee7b | Michael Hanselmann | |
284 | 34c9ee7b | Michael Hanselmann | class ChildIOProcessor(object): |
285 | f9323011 | Michael Hanselmann | def __init__(self, debug, status_file, logger, throughput_samples, exp_size): |
286 | 34c9ee7b | Michael Hanselmann | """Initializes this class.
|
287 | 34c9ee7b | Michael Hanselmann |
|
288 | 34c9ee7b | Michael Hanselmann | """
|
289 | 34c9ee7b | Michael Hanselmann | self._debug = debug
|
290 | 34c9ee7b | Michael Hanselmann | self._status_file = status_file
|
291 | 34c9ee7b | Michael Hanselmann | self._logger = logger
|
292 | 34c9ee7b | Michael Hanselmann | |
293 | 34c9ee7b | Michael Hanselmann | self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog)) |
294 | 34c9ee7b | Michael Hanselmann | for prog in PROG_ALL]) |
295 | 34c9ee7b | Michael Hanselmann | |
296 | c08d76f5 | Michael Hanselmann | self._dd_pid = None |
297 | c08d76f5 | Michael Hanselmann | self._dd_ready = False |
298 | c08d76f5 | Michael Hanselmann | self._dd_tp_samples = throughput_samples
|
299 | c08d76f5 | Michael Hanselmann | self._dd_progress = []
|
300 | c08d76f5 | Michael Hanselmann | |
301 | c08d76f5 | Michael Hanselmann | # Expected size of transferred data
|
302 | f9323011 | Michael Hanselmann | self._exp_size = exp_size
|
303 | c08d76f5 | Michael Hanselmann | |
304 | 34c9ee7b | Michael Hanselmann | def GetLineSplitter(self, prog): |
305 | 34c9ee7b | Michael Hanselmann | """Returns the line splitter for a program.
|
306 | 34c9ee7b | Michael Hanselmann |
|
307 | 34c9ee7b | Michael Hanselmann | """
|
308 | 34c9ee7b | Michael Hanselmann | return self._splitter[prog] |
309 | 34c9ee7b | Michael Hanselmann | |
310 | 34c9ee7b | Michael Hanselmann | def FlushAll(self): |
311 | 34c9ee7b | Michael Hanselmann | """Flushes all line splitters.
|
312 | 34c9ee7b | Michael Hanselmann |
|
313 | 34c9ee7b | Michael Hanselmann | """
|
314 | 34c9ee7b | Michael Hanselmann | for ls in self._splitter.itervalues(): |
315 | 34c9ee7b | Michael Hanselmann | ls.flush() |
316 | 34c9ee7b | Michael Hanselmann | |
317 | 34c9ee7b | Michael Hanselmann | def CloseAll(self): |
318 | 34c9ee7b | Michael Hanselmann | """Closes all line splitters.
|
319 | 34c9ee7b | Michael Hanselmann |
|
320 | 34c9ee7b | Michael Hanselmann | """
|
321 | 34c9ee7b | Michael Hanselmann | for ls in self._splitter.itervalues(): |
322 | 34c9ee7b | Michael Hanselmann | ls.close() |
323 | 34c9ee7b | Michael Hanselmann | self._splitter.clear()
|
324 | 34c9ee7b | Michael Hanselmann | |
325 | c08d76f5 | Michael Hanselmann | def NotifyDd(self): |
326 | c08d76f5 | Michael Hanselmann | """Tells dd(1) to write statistics.
|
327 | c08d76f5 | Michael Hanselmann |
|
328 | c08d76f5 | Michael Hanselmann | """
|
329 | c08d76f5 | Michael Hanselmann | if self._dd_pid is None: |
330 | c08d76f5 | Michael Hanselmann | # Can't notify
|
331 | c08d76f5 | Michael Hanselmann | return False |
332 | c08d76f5 | Michael Hanselmann | |
333 | c08d76f5 | Michael Hanselmann | if not self._dd_ready: |
334 | c08d76f5 | Michael Hanselmann | # There's a race condition between starting the program and sending
|
335 | c08d76f5 | Michael Hanselmann | # signals. The signal handler is only registered after some time, so we
|
336 | c08d76f5 | Michael Hanselmann | # have to check whether the program is ready. If it isn't, sending a
|
337 | c08d76f5 | Michael Hanselmann | # signal will invoke the default handler (and usually abort the program).
|
338 | c08d76f5 | Michael Hanselmann | if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL): |
339 | c08d76f5 | Michael Hanselmann | logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
|
340 | c08d76f5 | Michael Hanselmann | return False |
341 | c08d76f5 | Michael Hanselmann | |
342 | c08d76f5 | Michael Hanselmann | logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
|
343 | c08d76f5 | Michael Hanselmann | self._dd_ready = True |
344 | c08d76f5 | Michael Hanselmann | |
345 | c08d76f5 | Michael Hanselmann | logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid) |
346 | c08d76f5 | Michael Hanselmann | try:
|
347 | c08d76f5 | Michael Hanselmann | os.kill(self._dd_pid, DD_INFO_SIGNAL)
|
348 | c08d76f5 | Michael Hanselmann | except EnvironmentError, err: |
349 | c08d76f5 | Michael Hanselmann | if err.errno != errno.ESRCH:
|
350 | c08d76f5 | Michael Hanselmann | raise
|
351 | c08d76f5 | Michael Hanselmann | |
352 | c08d76f5 | Michael Hanselmann | # Process no longer exists
|
353 | 560cbec1 | Michael Hanselmann | logging.debug("dd exited")
|
354 | c08d76f5 | Michael Hanselmann | self._dd_pid = None |
355 | c08d76f5 | Michael Hanselmann | |
356 | c08d76f5 | Michael Hanselmann | return True |
357 | c08d76f5 | Michael Hanselmann | |
358 | 34c9ee7b | Michael Hanselmann | def _ProcessOutput(self, line, prog): |
359 | 34c9ee7b | Michael Hanselmann | """Takes care of child process output.
|
360 | 34c9ee7b | Michael Hanselmann |
|
361 | 34c9ee7b | Michael Hanselmann | @type line: string
|
362 | 34c9ee7b | Michael Hanselmann | @param line: Child output line
|
363 | 34c9ee7b | Michael Hanselmann | @type prog: number
|
364 | 34c9ee7b | Michael Hanselmann | @param prog: Program from which the line originates
|
365 | 34c9ee7b | Michael Hanselmann |
|
366 | 34c9ee7b | Michael Hanselmann | """
|
367 | 34c9ee7b | Michael Hanselmann | force_update = False
|
368 | 34c9ee7b | Michael Hanselmann | forward_line = line |
369 | 34c9ee7b | Michael Hanselmann | |
370 | 34c9ee7b | Michael Hanselmann | if prog == PROG_SOCAT:
|
371 | 34c9ee7b | Michael Hanselmann | level = None
|
372 | 34c9ee7b | Michael Hanselmann | parts = line.split(None, 4) |
373 | 34c9ee7b | Michael Hanselmann | |
374 | 34c9ee7b | Michael Hanselmann | if len(parts) == 5: |
375 | 34c9ee7b | Michael Hanselmann | (_, _, _, level, msg) = parts |
376 | 34c9ee7b | Michael Hanselmann | |
377 | 34c9ee7b | Michael Hanselmann | force_update = self._ProcessSocatOutput(self._status_file, level, msg) |
378 | 34c9ee7b | Michael Hanselmann | |
379 | 34c9ee7b | Michael Hanselmann | if self._debug or (level and level not in SOCAT_LOG_IGNORE): |
380 | 34c9ee7b | Michael Hanselmann | forward_line = "socat: %s %s" % (level, msg)
|
381 | 34c9ee7b | Michael Hanselmann | else:
|
382 | 34c9ee7b | Michael Hanselmann | forward_line = None
|
383 | 34c9ee7b | Michael Hanselmann | else:
|
384 | 34c9ee7b | Michael Hanselmann | forward_line = "socat: %s" % line
|
385 | 34c9ee7b | Michael Hanselmann | |
386 | c08d76f5 | Michael Hanselmann | elif prog == PROG_DD:
|
387 | c08d76f5 | Michael Hanselmann | (should_forward, force_update) = self._ProcessDdOutput(line)
|
388 | c08d76f5 | Michael Hanselmann | |
389 | c08d76f5 | Michael Hanselmann | if should_forward or self._debug: |
390 | c08d76f5 | Michael Hanselmann | forward_line = "dd: %s" % line
|
391 | c08d76f5 | Michael Hanselmann | else:
|
392 | c08d76f5 | Michael Hanselmann | forward_line = None
|
393 | c08d76f5 | Michael Hanselmann | |
394 | c08d76f5 | Michael Hanselmann | elif prog == PROG_DD_PID:
|
395 | c08d76f5 | Michael Hanselmann | if self._dd_pid: |
396 | c08d76f5 | Michael Hanselmann | raise RuntimeError("dd PID reported more than once") |
397 | c08d76f5 | Michael Hanselmann | logging.debug("Received dd PID %r", line)
|
398 | c08d76f5 | Michael Hanselmann | self._dd_pid = int(line) |
399 | c08d76f5 | Michael Hanselmann | forward_line = None
|
400 | c08d76f5 | Michael Hanselmann | |
401 | f9323011 | Michael Hanselmann | elif prog == PROG_EXP_SIZE:
|
402 | f9323011 | Michael Hanselmann | logging.debug("Received predicted size %r", line)
|
403 | f9323011 | Michael Hanselmann | forward_line = None
|
404 | f9323011 | Michael Hanselmann | |
405 | f9323011 | Michael Hanselmann | if line:
|
406 | f9323011 | Michael Hanselmann | try:
|
407 | f9323011 | Michael Hanselmann | exp_size = utils.BytesToMebibyte(int(line))
|
408 | f9323011 | Michael Hanselmann | except (ValueError, TypeError), err: |
409 | f9323011 | Michael Hanselmann | logging.error("Failed to convert predicted size %r to number: %s",
|
410 | f9323011 | Michael Hanselmann | line, err) |
411 | f9323011 | Michael Hanselmann | exp_size = None
|
412 | f9323011 | Michael Hanselmann | else:
|
413 | f9323011 | Michael Hanselmann | exp_size = None
|
414 | f9323011 | Michael Hanselmann | |
415 | f9323011 | Michael Hanselmann | self._exp_size = exp_size
|
416 | f9323011 | Michael Hanselmann | |
417 | 34c9ee7b | Michael Hanselmann | if forward_line:
|
418 | 34c9ee7b | Michael Hanselmann | self._logger.info(forward_line)
|
419 | 34c9ee7b | Michael Hanselmann | self._status_file.AddRecentOutput(forward_line)
|
420 | 34c9ee7b | Michael Hanselmann | |
421 | 34c9ee7b | Michael Hanselmann | self._status_file.Update(force_update)
|
422 | 34c9ee7b | Michael Hanselmann | |
423 | 34c9ee7b | Michael Hanselmann | @staticmethod
|
424 | 34c9ee7b | Michael Hanselmann | def _ProcessSocatOutput(status_file, level, msg): |
425 | 34c9ee7b | Michael Hanselmann | """Interprets socat log output.
|
426 | 34c9ee7b | Michael Hanselmann |
|
427 | 34c9ee7b | Michael Hanselmann | """
|
428 | 34c9ee7b | Michael Hanselmann | if level == SOCAT_LOG_NOTICE:
|
429 | 34c9ee7b | Michael Hanselmann | if status_file.GetListenPort() is None: |
430 | 34c9ee7b | Michael Hanselmann | # TODO: Maybe implement timeout to not listen forever
|
431 | 34c9ee7b | Michael Hanselmann | m = LISTENING_RE.match(msg) |
432 | 34c9ee7b | Michael Hanselmann | if m:
|
433 | 34c9ee7b | Michael Hanselmann | (_, port) = _VerifyListening(int(m.group("family")), |
434 | 34c9ee7b | Michael Hanselmann | m.group("address"),
|
435 | 34c9ee7b | Michael Hanselmann | int(m.group("port"))) |
436 | 34c9ee7b | Michael Hanselmann | |
437 | 34c9ee7b | Michael Hanselmann | status_file.SetListenPort(port) |
438 | 34c9ee7b | Michael Hanselmann | return True |
439 | 34c9ee7b | Michael Hanselmann | |
440 | 34c9ee7b | Michael Hanselmann | if not status_file.GetConnected(): |
441 | 34c9ee7b | Michael Hanselmann | m = TRANSFER_LOOP_RE.match(msg) |
442 | 34c9ee7b | Michael Hanselmann | if m:
|
443 | 53dbf14c | Michael Hanselmann | logging.debug("Connection established")
|
444 | 34c9ee7b | Michael Hanselmann | status_file.SetConnected() |
445 | 34c9ee7b | Michael Hanselmann | return True |
446 | 34c9ee7b | Michael Hanselmann | |
447 | 34c9ee7b | Michael Hanselmann | return False |
448 | c08d76f5 | Michael Hanselmann | |
449 | c08d76f5 | Michael Hanselmann | def _ProcessDdOutput(self, line): |
450 | c08d76f5 | Michael Hanselmann | """Interprets a line of dd(1)'s output.
|
451 | c08d76f5 | Michael Hanselmann |
|
452 | c08d76f5 | Michael Hanselmann | """
|
453 | c08d76f5 | Michael Hanselmann | m = DD_INFO_RE.match(line) |
454 | c08d76f5 | Michael Hanselmann | if m:
|
455 | c08d76f5 | Michael Hanselmann | seconds = float(m.group("seconds")) |
456 | c08d76f5 | Michael Hanselmann | mbytes = utils.BytesToMebibyte(int(m.group("bytes"))) |
457 | c08d76f5 | Michael Hanselmann | self._UpdateDdProgress(seconds, mbytes)
|
458 | c08d76f5 | Michael Hanselmann | return (False, True) |
459 | c08d76f5 | Michael Hanselmann | |
460 | c08d76f5 | Michael Hanselmann | m = DD_STDERR_IGNORE.match(line) |
461 | c08d76f5 | Michael Hanselmann | if m:
|
462 | c08d76f5 | Michael Hanselmann | # Ignore
|
463 | c08d76f5 | Michael Hanselmann | return (False, False) |
464 | c08d76f5 | Michael Hanselmann | |
465 | c08d76f5 | Michael Hanselmann | # Forward line
|
466 | c08d76f5 | Michael Hanselmann | return (True, False) |
467 | c08d76f5 | Michael Hanselmann | |
468 | c08d76f5 | Michael Hanselmann | def _UpdateDdProgress(self, seconds, mbytes): |
469 | c08d76f5 | Michael Hanselmann | """Updates the internal status variables for dd(1) progress.
|
470 | c08d76f5 | Michael Hanselmann |
|
471 | c08d76f5 | Michael Hanselmann | @type seconds: float
|
472 | c08d76f5 | Michael Hanselmann | @param seconds: Timestamp of this update
|
473 | c08d76f5 | Michael Hanselmann | @type mbytes: float
|
474 | c08d76f5 | Michael Hanselmann | @param mbytes: Total number of MiB transferred so far
|
475 | c08d76f5 | Michael Hanselmann |
|
476 | c08d76f5 | Michael Hanselmann | """
|
477 | c08d76f5 | Michael Hanselmann | # Add latest sample
|
478 | c08d76f5 | Michael Hanselmann | self._dd_progress.append((seconds, mbytes))
|
479 | c08d76f5 | Michael Hanselmann | |
480 | c08d76f5 | Michael Hanselmann | # Remove old samples
|
481 | c08d76f5 | Michael Hanselmann | del self._dd_progress[:-self._dd_tp_samples] |
482 | c08d76f5 | Michael Hanselmann | |
483 | c08d76f5 | Michael Hanselmann | # Calculate throughput
|
484 | c08d76f5 | Michael Hanselmann | throughput = _CalcThroughput(self._dd_progress)
|
485 | c08d76f5 | Michael Hanselmann | |
486 | c08d76f5 | Michael Hanselmann | # Calculate percent and ETA
|
487 | c08d76f5 | Michael Hanselmann | percent = None
|
488 | c08d76f5 | Michael Hanselmann | eta = None
|
489 | c08d76f5 | Michael Hanselmann | |
490 | c08d76f5 | Michael Hanselmann | if self._exp_size is not None: |
491 | c08d76f5 | Michael Hanselmann | if self._exp_size != 0: |
492 | c08d76f5 | Michael Hanselmann | percent = max(0, min(100, (100.0 * mbytes) / self._exp_size)) |
493 | c08d76f5 | Michael Hanselmann | |
494 | c08d76f5 | Michael Hanselmann | if throughput:
|
495 | c08d76f5 | Michael Hanselmann | eta = max(0, float(self._exp_size - mbytes) / throughput) |
496 | c08d76f5 | Michael Hanselmann | |
497 | c08d76f5 | Michael Hanselmann | self._status_file.SetProgress(mbytes, throughput, percent, eta)
|
498 | c08d76f5 | Michael Hanselmann | |
499 | c08d76f5 | Michael Hanselmann | |
500 | c08d76f5 | Michael Hanselmann | def _CalcThroughput(samples): |
501 | c08d76f5 | Michael Hanselmann | """Calculates the throughput in MiB/second.
|
502 | c08d76f5 | Michael Hanselmann |
|
503 | c08d76f5 | Michael Hanselmann | @type samples: sequence
|
504 | c08d76f5 | Michael Hanselmann | @param samples: List of samples, each consisting of a (timestamp, mbytes)
|
505 | c08d76f5 | Michael Hanselmann | tuple
|
506 | c08d76f5 | Michael Hanselmann | @rtype: float or None
|
507 | c08d76f5 | Michael Hanselmann | @return: Throughput in MiB/second
|
508 | c08d76f5 | Michael Hanselmann |
|
509 | c08d76f5 | Michael Hanselmann | """
|
510 | c08d76f5 | Michael Hanselmann | if len(samples) < 2: |
511 | c08d76f5 | Michael Hanselmann | # Can't calculate throughput
|
512 | c08d76f5 | Michael Hanselmann | return None |
513 | c08d76f5 | Michael Hanselmann | |
514 | c08d76f5 | Michael Hanselmann | (start_time, start_mbytes) = samples[0]
|
515 | c08d76f5 | Michael Hanselmann | (end_time, end_mbytes) = samples[-1]
|
516 | c08d76f5 | Michael Hanselmann | |
517 | c08d76f5 | Michael Hanselmann | return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time) |