root / lib / impexpd / __init__.py @ eaa4c57c
History | View | Annotate | Download (16.2 kB)
1 | bb44b1ae | Michael Hanselmann | #
|
---|---|---|---|
2 | bb44b1ae | Michael Hanselmann | #
|
3 | bb44b1ae | Michael Hanselmann | |
4 | bb44b1ae | Michael Hanselmann | # Copyright (C) 2010 Google Inc.
|
5 | bb44b1ae | Michael Hanselmann | #
|
6 | bb44b1ae | Michael Hanselmann | # This program is free software; you can redistribute it and/or modify
|
7 | bb44b1ae | Michael Hanselmann | # it under the terms of the GNU General Public License as published by
|
8 | bb44b1ae | Michael Hanselmann | # the Free Software Foundation; either version 2 of the License, or
|
9 | bb44b1ae | Michael Hanselmann | # (at your option) any later version.
|
10 | bb44b1ae | Michael Hanselmann | #
|
11 | bb44b1ae | Michael Hanselmann | # This program is distributed in the hope that it will be useful, but
|
12 | bb44b1ae | Michael Hanselmann | # WITHOUT ANY WARRANTY; without even the implied warranty of
|
13 | bb44b1ae | Michael Hanselmann | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
14 | bb44b1ae | Michael Hanselmann | # General Public License for more details.
|
15 | bb44b1ae | Michael Hanselmann | #
|
16 | bb44b1ae | Michael Hanselmann | # You should have received a copy of the GNU General Public License
|
17 | bb44b1ae | Michael Hanselmann | # along with this program; if not, write to the Free Software
|
18 | bb44b1ae | Michael Hanselmann | # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
19 | bb44b1ae | Michael Hanselmann | # 02110-1301, USA.
|
20 | bb44b1ae | Michael Hanselmann | |
21 | bb44b1ae | Michael Hanselmann | |
22 | bb44b1ae | Michael Hanselmann | """Classes and functions for import/export daemon.
|
23 | bb44b1ae | Michael Hanselmann |
|
24 | bb44b1ae | Michael Hanselmann | """
|
25 | bb44b1ae | Michael Hanselmann | |
26 | c08d76f5 | Michael Hanselmann | import os |
27 | 34c9ee7b | Michael Hanselmann | import re |
28 | 34c9ee7b | Michael Hanselmann | import socket |
29 | 53dbf14c | Michael Hanselmann | import logging |
30 | c08d76f5 | Michael Hanselmann | import signal |
31 | c08d76f5 | Michael Hanselmann | import errno |
32 | c08d76f5 | Michael Hanselmann | import time |
33 | bb44b1ae | Michael Hanselmann | from cStringIO import StringIO |
34 | bb44b1ae | Michael Hanselmann | |
35 | bb44b1ae | Michael Hanselmann | from ganeti import constants |
36 | bb44b1ae | Michael Hanselmann | from ganeti import errors |
37 | bb44b1ae | Michael Hanselmann | from ganeti import utils |
38 | 58bb385c | Michael Hanselmann | from ganeti import netutils |
39 | bb44b1ae | Michael Hanselmann | |
40 | bb44b1ae | Michael Hanselmann | |
41 | 34c9ee7b | Michael Hanselmann | #: Used to recognize point at which socat(1) starts to listen on its socket.
|
42 | 34c9ee7b | Michael Hanselmann | #: The local address is required for the remote peer to connect (in particular
|
43 | 34c9ee7b | Michael Hanselmann | #: the port number).
|
44 | 34c9ee7b | Michael Hanselmann | LISTENING_RE = re.compile(r"^listening on\s+"
|
45 | 34c9ee7b | Michael Hanselmann | r"AF=(?P<family>\d+)\s+"
|
46 | 34c9ee7b | Michael Hanselmann | r"(?P<address>.+):(?P<port>\d+)$", re.I)
|
47 | 34c9ee7b | Michael Hanselmann | |
48 | 34c9ee7b | Michael Hanselmann | #: Used to recognize point at which socat(1) is sending data over the wire
|
49 | 34c9ee7b | Michael Hanselmann | TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
|
50 | 34c9ee7b | Michael Hanselmann | re.I) |
51 | 34c9ee7b | Michael Hanselmann | |
52 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_DEBUG = "D"
|
53 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_INFO = "I"
|
54 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_NOTICE = "N"
|
55 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_WARNING = "W"
|
56 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_ERROR = "E"
|
57 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_FATAL = "F"
|
58 | 34c9ee7b | Michael Hanselmann | |
59 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_IGNORE = frozenset([
|
60 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_DEBUG, |
61 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_INFO, |
62 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_NOTICE, |
63 | 34c9ee7b | Michael Hanselmann | ]) |
64 | 34c9ee7b | Michael Hanselmann | |
65 | c08d76f5 | Michael Hanselmann | #: Used to parse GNU dd(1) statistics
|
66 | c08d76f5 | Michael Hanselmann | DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
|
67 | c08d76f5 | Michael Hanselmann | r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
|
68 | c08d76f5 | Michael Hanselmann | |
69 | c08d76f5 | Michael Hanselmann | #: Used to ignore "N+N records in/out" on dd(1)'s stderr
|
70 | c08d76f5 | Michael Hanselmann | DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
|
71 | c08d76f5 | Michael Hanselmann | |
72 | c08d76f5 | Michael Hanselmann | #: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is
|
73 | c08d76f5 | Michael Hanselmann | #: unavailable and SIGUSR1 is used instead)
|
74 | c08d76f5 | Michael Hanselmann | DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1) |
75 | c08d76f5 | Michael Hanselmann | |
76 | bb44b1ae | Michael Hanselmann | #: Buffer size: at most this many bytes are transferred at once
|
77 | bb44b1ae | Michael Hanselmann | BUFSIZE = 1024 * 1024 |
78 | bb44b1ae | Michael Hanselmann | |
79 | bb44b1ae | Michael Hanselmann | # Common options for socat
|
80 | bb44b1ae | Michael Hanselmann | SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"] |
81 | 971bbd84 | Michael Hanselmann | SOCAT_OPENSSL_OPTS = ["verify=1", "method=TLSv1", |
82 | 971bbd84 | Michael Hanselmann | "cipher=%s" % constants.OPENSSL_CIPHERS]
|
83 | bb44b1ae | Michael Hanselmann | |
84 | e90739d6 | Michael Hanselmann | if constants.SOCAT_USE_COMPRESS:
|
85 | e90739d6 | Michael Hanselmann | # Disables all compression in by OpenSSL. Only supported in patched versions
|
86 | e90739d6 | Michael Hanselmann | # of socat (as of November 2010). See INSTALL for more information.
|
87 | e90739d6 | Michael Hanselmann | SOCAT_OPENSSL_OPTS.append("compress=none")
|
88 | e90739d6 | Michael Hanselmann | |
89 | 0559f745 | Michael Hanselmann | SOCAT_OPTION_MAXLEN = 400
|
90 | 0559f745 | Michael Hanselmann | |
91 | 34c9ee7b | Michael Hanselmann | (PROG_OTHER, |
92 | c08d76f5 | Michael Hanselmann | PROG_SOCAT, |
93 | c08d76f5 | Michael Hanselmann | PROG_DD, |
94 | f9323011 | Michael Hanselmann | PROG_DD_PID, |
95 | f9323011 | Michael Hanselmann | PROG_EXP_SIZE) = range(1, 6) |
96 | 34c9ee7b | Michael Hanselmann | PROG_ALL = frozenset([
|
97 | 34c9ee7b | Michael Hanselmann | PROG_OTHER, |
98 | 34c9ee7b | Michael Hanselmann | PROG_SOCAT, |
99 | c08d76f5 | Michael Hanselmann | PROG_DD, |
100 | c08d76f5 | Michael Hanselmann | PROG_DD_PID, |
101 | f9323011 | Michael Hanselmann | PROG_EXP_SIZE, |
102 | 34c9ee7b | Michael Hanselmann | ]) |
103 | 34c9ee7b | Michael Hanselmann | |
104 | bb44b1ae | Michael Hanselmann | |
105 | bb44b1ae | Michael Hanselmann | class CommandBuilder(object): |
106 | c08d76f5 | Michael Hanselmann | def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd): |
107 | bb44b1ae | Michael Hanselmann | """Initializes this class.
|
108 | bb44b1ae | Michael Hanselmann |
|
109 | bb44b1ae | Michael Hanselmann | @param mode: Daemon mode (import or export)
|
110 | bb44b1ae | Michael Hanselmann | @param opts: Options object
|
111 | bb44b1ae | Michael Hanselmann | @type socat_stderr_fd: int
|
112 | bb44b1ae | Michael Hanselmann | @param socat_stderr_fd: File descriptor socat should write its stderr to
|
113 | c08d76f5 | Michael Hanselmann | @type dd_stderr_fd: int
|
114 | c08d76f5 | Michael Hanselmann | @param dd_stderr_fd: File descriptor dd should write its stderr to
|
115 | c08d76f5 | Michael Hanselmann | @type dd_pid_fd: int
|
116 | c08d76f5 | Michael Hanselmann | @param dd_pid_fd: File descriptor the child should write dd's PID to
|
117 | bb44b1ae | Michael Hanselmann |
|
118 | bb44b1ae | Michael Hanselmann | """
|
119 | bb44b1ae | Michael Hanselmann | self._opts = opts
|
120 | bb44b1ae | Michael Hanselmann | self._mode = mode
|
121 | bb44b1ae | Michael Hanselmann | self._socat_stderr_fd = socat_stderr_fd
|
122 | c08d76f5 | Michael Hanselmann | self._dd_stderr_fd = dd_stderr_fd
|
123 | c08d76f5 | Michael Hanselmann | self._dd_pid_fd = dd_pid_fd
|
124 | bb44b1ae | Michael Hanselmann | |
125 | 1d3dfa29 | Michael Hanselmann | assert (self._opts.magic is None or |
126 | 1d3dfa29 | Michael Hanselmann | constants.IE_MAGIC_RE.match(self._opts.magic))
|
127 | 1d3dfa29 | Michael Hanselmann | |
128 | bb44b1ae | Michael Hanselmann | @staticmethod
|
129 | bb44b1ae | Michael Hanselmann | def GetBashCommand(cmd): |
130 | bb44b1ae | Michael Hanselmann | """Prepares a command to be run in Bash.
|
131 | bb44b1ae | Michael Hanselmann |
|
132 | bb44b1ae | Michael Hanselmann | """
|
133 | bb44b1ae | Michael Hanselmann | return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd] |
134 | bb44b1ae | Michael Hanselmann | |
135 | bb44b1ae | Michael Hanselmann | def _GetSocatCommand(self): |
136 | bb44b1ae | Michael Hanselmann | """Returns the socat command.
|
137 | bb44b1ae | Michael Hanselmann |
|
138 | bb44b1ae | Michael Hanselmann | """
|
139 | bb44b1ae | Michael Hanselmann | common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [ |
140 | bb44b1ae | Michael Hanselmann | "key=%s" % self._opts.key, |
141 | bb44b1ae | Michael Hanselmann | "cert=%s" % self._opts.cert, |
142 | bb44b1ae | Michael Hanselmann | "cafile=%s" % self._opts.ca, |
143 | bb44b1ae | Michael Hanselmann | ] |
144 | bb44b1ae | Michael Hanselmann | |
145 | bb44b1ae | Michael Hanselmann | if self._opts.bind is not None: |
146 | bb44b1ae | Michael Hanselmann | common_addr_opts.append("bind=%s" % self._opts.bind) |
147 | bb44b1ae | Michael Hanselmann | |
148 | 58bb385c | Michael Hanselmann | assert not (self._opts.ipv4 and self._opts.ipv6) |
149 | 58bb385c | Michael Hanselmann | |
150 | 58bb385c | Michael Hanselmann | if self._opts.ipv4: |
151 | 58bb385c | Michael Hanselmann | common_addr_opts.append("pf=ipv4")
|
152 | 58bb385c | Michael Hanselmann | elif self._opts.ipv6: |
153 | 58bb385c | Michael Hanselmann | common_addr_opts.append("pf=ipv6")
|
154 | 58bb385c | Michael Hanselmann | |
155 | bb44b1ae | Michael Hanselmann | if self._mode == constants.IEM_IMPORT: |
156 | bb44b1ae | Michael Hanselmann | if self._opts.port is None: |
157 | bb44b1ae | Michael Hanselmann | port = 0
|
158 | bb44b1ae | Michael Hanselmann | else:
|
159 | bb44b1ae | Michael Hanselmann | port = self._opts.port
|
160 | bb44b1ae | Michael Hanselmann | |
161 | bb44b1ae | Michael Hanselmann | addr1 = [ |
162 | bb44b1ae | Michael Hanselmann | "OPENSSL-LISTEN:%s" % port,
|
163 | bb44b1ae | Michael Hanselmann | "reuseaddr",
|
164 | bb44b1ae | Michael Hanselmann | |
165 | bb44b1ae | Michael Hanselmann | # Retry to listen if connection wasn't established successfully, up to
|
166 | bb44b1ae | Michael Hanselmann | # 100 times a second. Note that this still leaves room for DoS attacks.
|
167 | bb44b1ae | Michael Hanselmann | "forever",
|
168 | bb44b1ae | Michael Hanselmann | "intervall=0.01",
|
169 | bb44b1ae | Michael Hanselmann | ] + common_addr_opts |
170 | bb44b1ae | Michael Hanselmann | addr2 = ["stdout"]
|
171 | bb44b1ae | Michael Hanselmann | |
172 | bb44b1ae | Michael Hanselmann | elif self._mode == constants.IEM_EXPORT: |
173 | 58bb385c | Michael Hanselmann | if self._opts.host and netutils.IP6Address.IsValid(self._opts.host): |
174 | 58bb385c | Michael Hanselmann | host = "[%s]" % self._opts.host |
175 | 58bb385c | Michael Hanselmann | else:
|
176 | 58bb385c | Michael Hanselmann | host = self._opts.host
|
177 | 58bb385c | Michael Hanselmann | |
178 | bb44b1ae | Michael Hanselmann | addr1 = ["stdin"]
|
179 | bb44b1ae | Michael Hanselmann | addr2 = [ |
180 | 58bb385c | Michael Hanselmann | "OPENSSL:%s:%s" % (host, self._opts.port), |
181 | bb44b1ae | Michael Hanselmann | |
182 | bb44b1ae | Michael Hanselmann | # How long to wait per connection attempt
|
183 | bb44b1ae | Michael Hanselmann | "connect-timeout=%s" % self._opts.connect_timeout, |
184 | bb44b1ae | Michael Hanselmann | |
185 | bb44b1ae | Michael Hanselmann | # Retry a few times before giving up to connect (once per second)
|
186 | bb44b1ae | Michael Hanselmann | "retry=%s" % self._opts.connect_retries, |
187 | bb44b1ae | Michael Hanselmann | "intervall=1",
|
188 | bb44b1ae | Michael Hanselmann | ] + common_addr_opts |
189 | bb44b1ae | Michael Hanselmann | |
190 | bb44b1ae | Michael Hanselmann | else:
|
191 | bb44b1ae | Michael Hanselmann | raise errors.GenericError("Invalid mode '%s'" % self._mode) |
192 | bb44b1ae | Michael Hanselmann | |
193 | bb44b1ae | Michael Hanselmann | for i in [addr1, addr2]: |
194 | bb44b1ae | Michael Hanselmann | for value in i: |
195 | 0559f745 | Michael Hanselmann | if len(value) > SOCAT_OPTION_MAXLEN: |
196 | 0559f745 | Michael Hanselmann | raise errors.GenericError("Socat option longer than %s" |
197 | 0559f745 | Michael Hanselmann | " characters: %r" %
|
198 | 0559f745 | Michael Hanselmann | (SOCAT_OPTION_MAXLEN, value)) |
199 | bb44b1ae | Michael Hanselmann | if "," in value: |
200 | bb44b1ae | Michael Hanselmann | raise errors.GenericError("Comma not allowed in socat option" |
201 | bb44b1ae | Michael Hanselmann | " value: %r" % value)
|
202 | bb44b1ae | Michael Hanselmann | |
203 | bb44b1ae | Michael Hanselmann | return [
|
204 | bb44b1ae | Michael Hanselmann | constants.SOCAT_PATH, |
205 | bb44b1ae | Michael Hanselmann | |
206 | bb44b1ae | Michael Hanselmann | # Log to stderr
|
207 | bb44b1ae | Michael Hanselmann | "-ls",
|
208 | bb44b1ae | Michael Hanselmann | |
209 | bb44b1ae | Michael Hanselmann | # Log level
|
210 | bb44b1ae | Michael Hanselmann | "-d", "-d", |
211 | bb44b1ae | Michael Hanselmann | |
212 | bb44b1ae | Michael Hanselmann | # Buffer size
|
213 | bb44b1ae | Michael Hanselmann | "-b%s" % BUFSIZE,
|
214 | bb44b1ae | Michael Hanselmann | |
215 | bb44b1ae | Michael Hanselmann | # Unidirectional mode, the first address is only used for reading, and the
|
216 | bb44b1ae | Michael Hanselmann | # second address is only used for writing
|
217 | bb44b1ae | Michael Hanselmann | "-u",
|
218 | bb44b1ae | Michael Hanselmann | |
219 | bb44b1ae | Michael Hanselmann | ",".join(addr1), ",".join(addr2) |
220 | bb44b1ae | Michael Hanselmann | ] |
221 | bb44b1ae | Michael Hanselmann | |
222 | 1d3dfa29 | Michael Hanselmann | def _GetMagicCommand(self): |
223 | 1d3dfa29 | Michael Hanselmann | """Returns the command to read/write the magic value.
|
224 | 1d3dfa29 | Michael Hanselmann |
|
225 | 1d3dfa29 | Michael Hanselmann | """
|
226 | 1d3dfa29 | Michael Hanselmann | if not self._opts.magic: |
227 | 1d3dfa29 | Michael Hanselmann | return None |
228 | 1d3dfa29 | Michael Hanselmann | |
229 | 1d3dfa29 | Michael Hanselmann | # Prefix to ensure magic isn't interpreted as option to "echo"
|
230 | 1d3dfa29 | Michael Hanselmann | magic = "M=%s" % self._opts.magic |
231 | 1d3dfa29 | Michael Hanselmann | |
232 | 1d3dfa29 | Michael Hanselmann | cmd = StringIO() |
233 | 1d3dfa29 | Michael Hanselmann | |
234 | 1d3dfa29 | Michael Hanselmann | if self._mode == constants.IEM_IMPORT: |
235 | 1d3dfa29 | Michael Hanselmann | cmd.write("{ ")
|
236 | 1d3dfa29 | Michael Hanselmann | cmd.write(utils.ShellQuoteArgs(["read", "-n", str(len(magic)), "magic"])) |
237 | 1d3dfa29 | Michael Hanselmann | cmd.write(" && ")
|
238 | 1d3dfa29 | Michael Hanselmann | cmd.write("if test \"$magic\" != %s; then" % utils.ShellQuote(magic))
|
239 | 1d3dfa29 | Michael Hanselmann | cmd.write(" echo %s >&2;" % utils.ShellQuote("Magic value mismatch")) |
240 | 1d3dfa29 | Michael Hanselmann | cmd.write(" exit 1;")
|
241 | 1d3dfa29 | Michael Hanselmann | cmd.write("fi;")
|
242 | 1d3dfa29 | Michael Hanselmann | cmd.write(" }")
|
243 | 1d3dfa29 | Michael Hanselmann | |
244 | 1d3dfa29 | Michael Hanselmann | elif self._mode == constants.IEM_EXPORT: |
245 | 1d3dfa29 | Michael Hanselmann | cmd.write(utils.ShellQuoteArgs(["echo", "-E", "-n", magic])) |
246 | 1d3dfa29 | Michael Hanselmann | |
247 | 1d3dfa29 | Michael Hanselmann | else:
|
248 | 1d3dfa29 | Michael Hanselmann | raise errors.GenericError("Invalid mode '%s'" % self._mode) |
249 | 1d3dfa29 | Michael Hanselmann | |
250 | 1d3dfa29 | Michael Hanselmann | return cmd.getvalue()
|
251 | 1d3dfa29 | Michael Hanselmann | |
252 | fbb6b864 | Michael Hanselmann | def _GetDdCommand(self): |
253 | fbb6b864 | Michael Hanselmann | """Returns the command for measuring throughput.
|
254 | bb44b1ae | Michael Hanselmann |
|
255 | bb44b1ae | Michael Hanselmann | """
|
256 | c08d76f5 | Michael Hanselmann | dd_cmd = StringIO() |
257 | 1d3dfa29 | Michael Hanselmann | |
258 | 1d3dfa29 | Michael Hanselmann | magic_cmd = self._GetMagicCommand()
|
259 | 1d3dfa29 | Michael Hanselmann | if magic_cmd:
|
260 | 1d3dfa29 | Michael Hanselmann | dd_cmd.write("{ ")
|
261 | 1d3dfa29 | Michael Hanselmann | dd_cmd.write(magic_cmd) |
262 | 1d3dfa29 | Michael Hanselmann | dd_cmd.write(" && ")
|
263 | 1d3dfa29 | Michael Hanselmann | |
264 | 1d3dfa29 | Michael Hanselmann | dd_cmd.write("{ ")
|
265 | 2ed0e208 | Iustin Pop | # Setting LC_ALL since we want to parse the output and explicitly
|
266 | 2ed0e208 | Iustin Pop | # redirecting stdin, as the background process (dd) would have
|
267 | 2ed0e208 | Iustin Pop | # /dev/null as stdin otherwise
|
268 | 1d3dfa29 | Michael Hanselmann | dd_cmd.write("LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
|
269 | c08d76f5 | Michael Hanselmann | (BUFSIZE, self._dd_stderr_fd))
|
270 | c08d76f5 | Michael Hanselmann | # Send PID to daemon
|
271 | c08d76f5 | Michael Hanselmann | dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd) |
272 | c08d76f5 | Michael Hanselmann | # And wait for dd
|
273 | c08d76f5 | Michael Hanselmann | dd_cmd.write(" wait $pid;")
|
274 | c08d76f5 | Michael Hanselmann | dd_cmd.write(" }")
|
275 | 1d3dfa29 | Michael Hanselmann | |
276 | 1d3dfa29 | Michael Hanselmann | if magic_cmd:
|
277 | 1d3dfa29 | Michael Hanselmann | dd_cmd.write(" }")
|
278 | 1d3dfa29 | Michael Hanselmann | |
279 | fbb6b864 | Michael Hanselmann | return dd_cmd.getvalue()
|
280 | fbb6b864 | Michael Hanselmann | |
281 | fbb6b864 | Michael Hanselmann | def _GetTransportCommand(self): |
282 | fbb6b864 | Michael Hanselmann | """Returns the command for the transport part of the daemon.
|
283 | fbb6b864 | Michael Hanselmann |
|
284 | fbb6b864 | Michael Hanselmann | """
|
285 | fbb6b864 | Michael Hanselmann | socat_cmd = ("%s 2>&%d" %
|
286 | fbb6b864 | Michael Hanselmann | (utils.ShellQuoteArgs(self._GetSocatCommand()),
|
287 | fbb6b864 | Michael Hanselmann | self._socat_stderr_fd))
|
288 | fbb6b864 | Michael Hanselmann | dd_cmd = self._GetDdCommand()
|
289 | c08d76f5 | Michael Hanselmann | |
290 | bb44b1ae | Michael Hanselmann | compr = self._opts.compress
|
291 | bb44b1ae | Michael Hanselmann | |
292 | bb44b1ae | Michael Hanselmann | assert compr in constants.IEC_ALL |
293 | bb44b1ae | Michael Hanselmann | |
294 | fbb6b864 | Michael Hanselmann | parts = [] |
295 | fbb6b864 | Michael Hanselmann | |
296 | bb44b1ae | Michael Hanselmann | if self._mode == constants.IEM_IMPORT: |
297 | fbb6b864 | Michael Hanselmann | parts.append(socat_cmd) |
298 | fbb6b864 | Michael Hanselmann | |
299 | bb44b1ae | Michael Hanselmann | if compr == constants.IEC_GZIP:
|
300 | fbb6b864 | Michael Hanselmann | parts.append("gunzip -c")
|
301 | fbb6b864 | Michael Hanselmann | |
302 | fbb6b864 | Michael Hanselmann | parts.append(dd_cmd) |
303 | c08d76f5 | Michael Hanselmann | |
304 | bb44b1ae | Michael Hanselmann | elif self._mode == constants.IEM_EXPORT: |
305 | fbb6b864 | Michael Hanselmann | parts.append(dd_cmd) |
306 | fbb6b864 | Michael Hanselmann | |
307 | bb44b1ae | Michael Hanselmann | if compr == constants.IEC_GZIP:
|
308 | fbb6b864 | Michael Hanselmann | parts.append("gzip -c")
|
309 | fbb6b864 | Michael Hanselmann | |
310 | fbb6b864 | Michael Hanselmann | parts.append(socat_cmd) |
311 | c08d76f5 | Michael Hanselmann | |
312 | bb44b1ae | Michael Hanselmann | else:
|
313 | bb44b1ae | Michael Hanselmann | raise errors.GenericError("Invalid mode '%s'" % self._mode) |
314 | bb44b1ae | Michael Hanselmann | |
315 | bb44b1ae | Michael Hanselmann | # TODO: Run transport as separate user
|
316 | bb44b1ae | Michael Hanselmann | # The transport uses its own shell to simplify running it as a separate user
|
317 | bb44b1ae | Michael Hanselmann | # in the future.
|
318 | fbb6b864 | Michael Hanselmann | return self.GetBashCommand(" | ".join(parts)) |
319 | bb44b1ae | Michael Hanselmann | |
320 | bb44b1ae | Michael Hanselmann | def GetCommand(self): |
321 | bb44b1ae | Michael Hanselmann | """Returns the complete child process command.
|
322 | bb44b1ae | Michael Hanselmann |
|
323 | bb44b1ae | Michael Hanselmann | """
|
324 | bb44b1ae | Michael Hanselmann | transport_cmd = self._GetTransportCommand()
|
325 | bb44b1ae | Michael Hanselmann | |
326 | bb44b1ae | Michael Hanselmann | buf = StringIO() |
327 | bb44b1ae | Michael Hanselmann | |
328 | bb44b1ae | Michael Hanselmann | if self._opts.cmd_prefix: |
329 | bb44b1ae | Michael Hanselmann | buf.write(self._opts.cmd_prefix)
|
330 | bb44b1ae | Michael Hanselmann | buf.write(" ")
|
331 | bb44b1ae | Michael Hanselmann | |
332 | bb44b1ae | Michael Hanselmann | buf.write(utils.ShellQuoteArgs(transport_cmd)) |
333 | bb44b1ae | Michael Hanselmann | |
334 | bb44b1ae | Michael Hanselmann | if self._opts.cmd_suffix: |
335 | bb44b1ae | Michael Hanselmann | buf.write(" ")
|
336 | bb44b1ae | Michael Hanselmann | buf.write(self._opts.cmd_suffix)
|
337 | bb44b1ae | Michael Hanselmann | |
338 | bb44b1ae | Michael Hanselmann | return self.GetBashCommand(buf.getvalue()) |
339 | 34c9ee7b | Michael Hanselmann | |
340 | 34c9ee7b | Michael Hanselmann | |
341 | 34c9ee7b | Michael Hanselmann | def _VerifyListening(family, address, port): |
342 | 34c9ee7b | Michael Hanselmann | """Verify address given as listening address by socat.
|
343 | 34c9ee7b | Michael Hanselmann |
|
344 | 34c9ee7b | Michael Hanselmann | """
|
345 | 58bb385c | Michael Hanselmann | if family not in (socket.AF_INET, socket.AF_INET6): |
346 | 34c9ee7b | Michael Hanselmann | raise errors.GenericError("Address family %r not supported" % family) |
347 | 34c9ee7b | Michael Hanselmann | |
348 | 58bb385c | Michael Hanselmann | if (family == socket.AF_INET6 and address.startswith("[") and |
349 | 58bb385c | Michael Hanselmann | address.endswith("]")):
|
350 | 58bb385c | Michael Hanselmann | address = address.lstrip("[").rstrip("]") |
351 | 58bb385c | Michael Hanselmann | |
352 | 34c9ee7b | Michael Hanselmann | try:
|
353 | 34c9ee7b | Michael Hanselmann | packed_address = socket.inet_pton(family, address) |
354 | 34c9ee7b | Michael Hanselmann | except socket.error:
|
355 | 34c9ee7b | Michael Hanselmann | raise errors.GenericError("Invalid address %r for family %s" % |
356 | 34c9ee7b | Michael Hanselmann | (address, family)) |
357 | 34c9ee7b | Michael Hanselmann | |
358 | 34c9ee7b | Michael Hanselmann | return (socket.inet_ntop(family, packed_address), port)
|
359 | 34c9ee7b | Michael Hanselmann | |
360 | 34c9ee7b | Michael Hanselmann | |
361 | 34c9ee7b | Michael Hanselmann | class ChildIOProcessor(object): |
362 | f9323011 | Michael Hanselmann | def __init__(self, debug, status_file, logger, throughput_samples, exp_size): |
363 | 34c9ee7b | Michael Hanselmann | """Initializes this class.
|
364 | 34c9ee7b | Michael Hanselmann |
|
365 | 34c9ee7b | Michael Hanselmann | """
|
366 | 34c9ee7b | Michael Hanselmann | self._debug = debug
|
367 | 34c9ee7b | Michael Hanselmann | self._status_file = status_file
|
368 | 34c9ee7b | Michael Hanselmann | self._logger = logger
|
369 | 34c9ee7b | Michael Hanselmann | |
370 | 34c9ee7b | Michael Hanselmann | self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog)) |
371 | 34c9ee7b | Michael Hanselmann | for prog in PROG_ALL]) |
372 | 34c9ee7b | Michael Hanselmann | |
373 | c08d76f5 | Michael Hanselmann | self._dd_pid = None |
374 | c08d76f5 | Michael Hanselmann | self._dd_ready = False |
375 | c08d76f5 | Michael Hanselmann | self._dd_tp_samples = throughput_samples
|
376 | c08d76f5 | Michael Hanselmann | self._dd_progress = []
|
377 | c08d76f5 | Michael Hanselmann | |
378 | c08d76f5 | Michael Hanselmann | # Expected size of transferred data
|
379 | f9323011 | Michael Hanselmann | self._exp_size = exp_size
|
380 | c08d76f5 | Michael Hanselmann | |
381 | 34c9ee7b | Michael Hanselmann | def GetLineSplitter(self, prog): |
382 | 34c9ee7b | Michael Hanselmann | """Returns the line splitter for a program.
|
383 | 34c9ee7b | Michael Hanselmann |
|
384 | 34c9ee7b | Michael Hanselmann | """
|
385 | 34c9ee7b | Michael Hanselmann | return self._splitter[prog] |
386 | 34c9ee7b | Michael Hanselmann | |
387 | 34c9ee7b | Michael Hanselmann | def FlushAll(self): |
388 | 34c9ee7b | Michael Hanselmann | """Flushes all line splitters.
|
389 | 34c9ee7b | Michael Hanselmann |
|
390 | 34c9ee7b | Michael Hanselmann | """
|
391 | 34c9ee7b | Michael Hanselmann | for ls in self._splitter.itervalues(): |
392 | 34c9ee7b | Michael Hanselmann | ls.flush() |
393 | 34c9ee7b | Michael Hanselmann | |
394 | 34c9ee7b | Michael Hanselmann | def CloseAll(self): |
395 | 34c9ee7b | Michael Hanselmann | """Closes all line splitters.
|
396 | 34c9ee7b | Michael Hanselmann |
|
397 | 34c9ee7b | Michael Hanselmann | """
|
398 | 34c9ee7b | Michael Hanselmann | for ls in self._splitter.itervalues(): |
399 | 34c9ee7b | Michael Hanselmann | ls.close() |
400 | 34c9ee7b | Michael Hanselmann | self._splitter.clear()
|
401 | 34c9ee7b | Michael Hanselmann | |
402 | c08d76f5 | Michael Hanselmann | def NotifyDd(self): |
403 | c08d76f5 | Michael Hanselmann | """Tells dd(1) to write statistics.
|
404 | c08d76f5 | Michael Hanselmann |
|
405 | c08d76f5 | Michael Hanselmann | """
|
406 | c08d76f5 | Michael Hanselmann | if self._dd_pid is None: |
407 | c08d76f5 | Michael Hanselmann | # Can't notify
|
408 | c08d76f5 | Michael Hanselmann | return False |
409 | c08d76f5 | Michael Hanselmann | |
410 | c08d76f5 | Michael Hanselmann | if not self._dd_ready: |
411 | c08d76f5 | Michael Hanselmann | # There's a race condition between starting the program and sending
|
412 | c08d76f5 | Michael Hanselmann | # signals. The signal handler is only registered after some time, so we
|
413 | c08d76f5 | Michael Hanselmann | # have to check whether the program is ready. If it isn't, sending a
|
414 | c08d76f5 | Michael Hanselmann | # signal will invoke the default handler (and usually abort the program).
|
415 | c08d76f5 | Michael Hanselmann | if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL): |
416 | c08d76f5 | Michael Hanselmann | logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
|
417 | c08d76f5 | Michael Hanselmann | return False |
418 | c08d76f5 | Michael Hanselmann | |
419 | c08d76f5 | Michael Hanselmann | logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
|
420 | c08d76f5 | Michael Hanselmann | self._dd_ready = True |
421 | c08d76f5 | Michael Hanselmann | |
422 | c08d76f5 | Michael Hanselmann | logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid) |
423 | c08d76f5 | Michael Hanselmann | try:
|
424 | c08d76f5 | Michael Hanselmann | os.kill(self._dd_pid, DD_INFO_SIGNAL)
|
425 | c08d76f5 | Michael Hanselmann | except EnvironmentError, err: |
426 | c08d76f5 | Michael Hanselmann | if err.errno != errno.ESRCH:
|
427 | c08d76f5 | Michael Hanselmann | raise
|
428 | c08d76f5 | Michael Hanselmann | |
429 | c08d76f5 | Michael Hanselmann | # Process no longer exists
|
430 | 560cbec1 | Michael Hanselmann | logging.debug("dd exited")
|
431 | c08d76f5 | Michael Hanselmann | self._dd_pid = None |
432 | c08d76f5 | Michael Hanselmann | |
433 | c08d76f5 | Michael Hanselmann | return True |
434 | c08d76f5 | Michael Hanselmann | |
435 | 34c9ee7b | Michael Hanselmann | def _ProcessOutput(self, line, prog): |
436 | 34c9ee7b | Michael Hanselmann | """Takes care of child process output.
|
437 | 34c9ee7b | Michael Hanselmann |
|
438 | 34c9ee7b | Michael Hanselmann | @type line: string
|
439 | 34c9ee7b | Michael Hanselmann | @param line: Child output line
|
440 | 34c9ee7b | Michael Hanselmann | @type prog: number
|
441 | 34c9ee7b | Michael Hanselmann | @param prog: Program from which the line originates
|
442 | 34c9ee7b | Michael Hanselmann |
|
443 | 34c9ee7b | Michael Hanselmann | """
|
444 | 34c9ee7b | Michael Hanselmann | force_update = False
|
445 | 34c9ee7b | Michael Hanselmann | forward_line = line |
446 | 34c9ee7b | Michael Hanselmann | |
447 | 34c9ee7b | Michael Hanselmann | if prog == PROG_SOCAT:
|
448 | 34c9ee7b | Michael Hanselmann | level = None
|
449 | 34c9ee7b | Michael Hanselmann | parts = line.split(None, 4) |
450 | 34c9ee7b | Michael Hanselmann | |
451 | 34c9ee7b | Michael Hanselmann | if len(parts) == 5: |
452 | 34c9ee7b | Michael Hanselmann | (_, _, _, level, msg) = parts |
453 | 34c9ee7b | Michael Hanselmann | |
454 | 34c9ee7b | Michael Hanselmann | force_update = self._ProcessSocatOutput(self._status_file, level, msg) |
455 | 34c9ee7b | Michael Hanselmann | |
456 | 34c9ee7b | Michael Hanselmann | if self._debug or (level and level not in SOCAT_LOG_IGNORE): |
457 | 34c9ee7b | Michael Hanselmann | forward_line = "socat: %s %s" % (level, msg)
|
458 | 34c9ee7b | Michael Hanselmann | else:
|
459 | 34c9ee7b | Michael Hanselmann | forward_line = None
|
460 | 34c9ee7b | Michael Hanselmann | else:
|
461 | 34c9ee7b | Michael Hanselmann | forward_line = "socat: %s" % line
|
462 | 34c9ee7b | Michael Hanselmann | |
463 | c08d76f5 | Michael Hanselmann | elif prog == PROG_DD:
|
464 | c08d76f5 | Michael Hanselmann | (should_forward, force_update) = self._ProcessDdOutput(line)
|
465 | c08d76f5 | Michael Hanselmann | |
466 | c08d76f5 | Michael Hanselmann | if should_forward or self._debug: |
467 | c08d76f5 | Michael Hanselmann | forward_line = "dd: %s" % line
|
468 | c08d76f5 | Michael Hanselmann | else:
|
469 | c08d76f5 | Michael Hanselmann | forward_line = None
|
470 | c08d76f5 | Michael Hanselmann | |
471 | c08d76f5 | Michael Hanselmann | elif prog == PROG_DD_PID:
|
472 | c08d76f5 | Michael Hanselmann | if self._dd_pid: |
473 | c08d76f5 | Michael Hanselmann | raise RuntimeError("dd PID reported more than once") |
474 | c08d76f5 | Michael Hanselmann | logging.debug("Received dd PID %r", line)
|
475 | c08d76f5 | Michael Hanselmann | self._dd_pid = int(line) |
476 | c08d76f5 | Michael Hanselmann | forward_line = None
|
477 | c08d76f5 | Michael Hanselmann | |
478 | f9323011 | Michael Hanselmann | elif prog == PROG_EXP_SIZE:
|
479 | f9323011 | Michael Hanselmann | logging.debug("Received predicted size %r", line)
|
480 | f9323011 | Michael Hanselmann | forward_line = None
|
481 | f9323011 | Michael Hanselmann | |
482 | f9323011 | Michael Hanselmann | if line:
|
483 | f9323011 | Michael Hanselmann | try:
|
484 | f9323011 | Michael Hanselmann | exp_size = utils.BytesToMebibyte(int(line))
|
485 | f9323011 | Michael Hanselmann | except (ValueError, TypeError), err: |
486 | f9323011 | Michael Hanselmann | logging.error("Failed to convert predicted size %r to number: %s",
|
487 | f9323011 | Michael Hanselmann | line, err) |
488 | f9323011 | Michael Hanselmann | exp_size = None
|
489 | f9323011 | Michael Hanselmann | else:
|
490 | f9323011 | Michael Hanselmann | exp_size = None
|
491 | f9323011 | Michael Hanselmann | |
492 | f9323011 | Michael Hanselmann | self._exp_size = exp_size
|
493 | f9323011 | Michael Hanselmann | |
494 | 34c9ee7b | Michael Hanselmann | if forward_line:
|
495 | 34c9ee7b | Michael Hanselmann | self._logger.info(forward_line)
|
496 | 34c9ee7b | Michael Hanselmann | self._status_file.AddRecentOutput(forward_line)
|
497 | 34c9ee7b | Michael Hanselmann | |
498 | 34c9ee7b | Michael Hanselmann | self._status_file.Update(force_update)
|
499 | 34c9ee7b | Michael Hanselmann | |
500 | 34c9ee7b | Michael Hanselmann | @staticmethod
|
501 | 34c9ee7b | Michael Hanselmann | def _ProcessSocatOutput(status_file, level, msg): |
502 | 34c9ee7b | Michael Hanselmann | """Interprets socat log output.
|
503 | 34c9ee7b | Michael Hanselmann |
|
504 | 34c9ee7b | Michael Hanselmann | """
|
505 | 34c9ee7b | Michael Hanselmann | if level == SOCAT_LOG_NOTICE:
|
506 | 34c9ee7b | Michael Hanselmann | if status_file.GetListenPort() is None: |
507 | 34c9ee7b | Michael Hanselmann | # TODO: Maybe implement timeout to not listen forever
|
508 | 34c9ee7b | Michael Hanselmann | m = LISTENING_RE.match(msg) |
509 | 34c9ee7b | Michael Hanselmann | if m:
|
510 | 34c9ee7b | Michael Hanselmann | (_, port) = _VerifyListening(int(m.group("family")), |
511 | 34c9ee7b | Michael Hanselmann | m.group("address"),
|
512 | 34c9ee7b | Michael Hanselmann | int(m.group("port"))) |
513 | 34c9ee7b | Michael Hanselmann | |
514 | 34c9ee7b | Michael Hanselmann | status_file.SetListenPort(port) |
515 | 34c9ee7b | Michael Hanselmann | return True |
516 | 34c9ee7b | Michael Hanselmann | |
517 | 34c9ee7b | Michael Hanselmann | if not status_file.GetConnected(): |
518 | 34c9ee7b | Michael Hanselmann | m = TRANSFER_LOOP_RE.match(msg) |
519 | 34c9ee7b | Michael Hanselmann | if m:
|
520 | 53dbf14c | Michael Hanselmann | logging.debug("Connection established")
|
521 | 34c9ee7b | Michael Hanselmann | status_file.SetConnected() |
522 | 34c9ee7b | Michael Hanselmann | return True |
523 | 34c9ee7b | Michael Hanselmann | |
524 | 34c9ee7b | Michael Hanselmann | return False |
525 | c08d76f5 | Michael Hanselmann | |
526 | c08d76f5 | Michael Hanselmann | def _ProcessDdOutput(self, line): |
527 | c08d76f5 | Michael Hanselmann | """Interprets a line of dd(1)'s output.
|
528 | c08d76f5 | Michael Hanselmann |
|
529 | c08d76f5 | Michael Hanselmann | """
|
530 | c08d76f5 | Michael Hanselmann | m = DD_INFO_RE.match(line) |
531 | c08d76f5 | Michael Hanselmann | if m:
|
532 | c08d76f5 | Michael Hanselmann | seconds = float(m.group("seconds")) |
533 | c08d76f5 | Michael Hanselmann | mbytes = utils.BytesToMebibyte(int(m.group("bytes"))) |
534 | c08d76f5 | Michael Hanselmann | self._UpdateDdProgress(seconds, mbytes)
|
535 | c08d76f5 | Michael Hanselmann | return (False, True) |
536 | c08d76f5 | Michael Hanselmann | |
537 | c08d76f5 | Michael Hanselmann | m = DD_STDERR_IGNORE.match(line) |
538 | c08d76f5 | Michael Hanselmann | if m:
|
539 | c08d76f5 | Michael Hanselmann | # Ignore
|
540 | c08d76f5 | Michael Hanselmann | return (False, False) |
541 | c08d76f5 | Michael Hanselmann | |
542 | c08d76f5 | Michael Hanselmann | # Forward line
|
543 | c08d76f5 | Michael Hanselmann | return (True, False) |
544 | c08d76f5 | Michael Hanselmann | |
545 | c08d76f5 | Michael Hanselmann | def _UpdateDdProgress(self, seconds, mbytes): |
546 | c08d76f5 | Michael Hanselmann | """Updates the internal status variables for dd(1) progress.
|
547 | c08d76f5 | Michael Hanselmann |
|
548 | c08d76f5 | Michael Hanselmann | @type seconds: float
|
549 | c08d76f5 | Michael Hanselmann | @param seconds: Timestamp of this update
|
550 | c08d76f5 | Michael Hanselmann | @type mbytes: float
|
551 | c08d76f5 | Michael Hanselmann | @param mbytes: Total number of MiB transferred so far
|
552 | c08d76f5 | Michael Hanselmann |
|
553 | c08d76f5 | Michael Hanselmann | """
|
554 | c08d76f5 | Michael Hanselmann | # Add latest sample
|
555 | c08d76f5 | Michael Hanselmann | self._dd_progress.append((seconds, mbytes))
|
556 | c08d76f5 | Michael Hanselmann | |
557 | c08d76f5 | Michael Hanselmann | # Remove old samples
|
558 | c08d76f5 | Michael Hanselmann | del self._dd_progress[:-self._dd_tp_samples] |
559 | c08d76f5 | Michael Hanselmann | |
560 | c08d76f5 | Michael Hanselmann | # Calculate throughput
|
561 | c08d76f5 | Michael Hanselmann | throughput = _CalcThroughput(self._dd_progress)
|
562 | c08d76f5 | Michael Hanselmann | |
563 | c08d76f5 | Michael Hanselmann | # Calculate percent and ETA
|
564 | c08d76f5 | Michael Hanselmann | percent = None
|
565 | c08d76f5 | Michael Hanselmann | eta = None
|
566 | c08d76f5 | Michael Hanselmann | |
567 | c08d76f5 | Michael Hanselmann | if self._exp_size is not None: |
568 | c08d76f5 | Michael Hanselmann | if self._exp_size != 0: |
569 | c08d76f5 | Michael Hanselmann | percent = max(0, min(100, (100.0 * mbytes) / self._exp_size)) |
570 | c08d76f5 | Michael Hanselmann | |
571 | c08d76f5 | Michael Hanselmann | if throughput:
|
572 | c08d76f5 | Michael Hanselmann | eta = max(0, float(self._exp_size - mbytes) / throughput) |
573 | c08d76f5 | Michael Hanselmann | |
574 | c08d76f5 | Michael Hanselmann | self._status_file.SetProgress(mbytes, throughput, percent, eta)
|
575 | c08d76f5 | Michael Hanselmann | |
576 | c08d76f5 | Michael Hanselmann | |
577 | c08d76f5 | Michael Hanselmann | def _CalcThroughput(samples): |
578 | c08d76f5 | Michael Hanselmann | """Calculates the throughput in MiB/second.
|
579 | c08d76f5 | Michael Hanselmann |
|
580 | c08d76f5 | Michael Hanselmann | @type samples: sequence
|
581 | c08d76f5 | Michael Hanselmann | @param samples: List of samples, each consisting of a (timestamp, mbytes)
|
582 | c08d76f5 | Michael Hanselmann | tuple
|
583 | c08d76f5 | Michael Hanselmann | @rtype: float or None
|
584 | c08d76f5 | Michael Hanselmann | @return: Throughput in MiB/second
|
585 | c08d76f5 | Michael Hanselmann |
|
586 | c08d76f5 | Michael Hanselmann | """
|
587 | c08d76f5 | Michael Hanselmann | if len(samples) < 2: |
588 | c08d76f5 | Michael Hanselmann | # Can't calculate throughput
|
589 | c08d76f5 | Michael Hanselmann | return None |
590 | c08d76f5 | Michael Hanselmann | |
591 | c08d76f5 | Michael Hanselmann | (start_time, start_mbytes) = samples[0]
|
592 | c08d76f5 | Michael Hanselmann | (end_time, end_mbytes) = samples[-1]
|
593 | c08d76f5 | Michael Hanselmann | |
594 | c08d76f5 | Michael Hanselmann | return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time) |