Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ b705c7a6

History | View | Annotate | Download (43.4 kB)

1 033a1d00 Michael Hanselmann
#
2 033a1d00 Michael Hanselmann
#
3 033a1d00 Michael Hanselmann
4 033a1d00 Michael Hanselmann
# Copyright (C) 2010 Google Inc.
5 033a1d00 Michael Hanselmann
#
6 033a1d00 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 033a1d00 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 033a1d00 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 033a1d00 Michael Hanselmann
# (at your option) any later version.
10 033a1d00 Michael Hanselmann
#
11 033a1d00 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 033a1d00 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 033a1d00 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 033a1d00 Michael Hanselmann
# General Public License for more details.
15 033a1d00 Michael Hanselmann
#
16 033a1d00 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 033a1d00 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 033a1d00 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 033a1d00 Michael Hanselmann
# 02110-1301, USA.
20 033a1d00 Michael Hanselmann
21 033a1d00 Michael Hanselmann
22 033a1d00 Michael Hanselmann
"""Instance-related functions and classes for masterd.
23 033a1d00 Michael Hanselmann

24 033a1d00 Michael Hanselmann
"""
25 033a1d00 Michael Hanselmann
26 033a1d00 Michael Hanselmann
import logging
27 033a1d00 Michael Hanselmann
import time
28 4a96f1d1 Michael Hanselmann
import OpenSSL
29 033a1d00 Michael Hanselmann
30 033a1d00 Michael Hanselmann
from ganeti import constants
31 033a1d00 Michael Hanselmann
from ganeti import errors
32 033a1d00 Michael Hanselmann
from ganeti import compat
33 387794f8 Michael Hanselmann
from ganeti import utils
34 387794f8 Michael Hanselmann
from ganeti import objects
35 a744b676 Manuel Franceschini
from ganeti import netutils
36 033a1d00 Michael Hanselmann
37 033a1d00 Michael Hanselmann
38 033a1d00 Michael Hanselmann
class _ImportExportError(Exception):
39 033a1d00 Michael Hanselmann
  """Local exception to report import/export errors.
40 033a1d00 Michael Hanselmann

41 033a1d00 Michael Hanselmann
  """
42 033a1d00 Michael Hanselmann
43 033a1d00 Michael Hanselmann
44 033a1d00 Michael Hanselmann
class ImportExportTimeouts(object):
45 033a1d00 Michael Hanselmann
  #: Time until daemon starts writing status file
46 033a1d00 Michael Hanselmann
  DEFAULT_READY_TIMEOUT = 10
47 033a1d00 Michael Hanselmann
48 033a1d00 Michael Hanselmann
  #: Length of time until errors cause hard failure
49 033a1d00 Michael Hanselmann
  DEFAULT_ERROR_TIMEOUT = 10
50 033a1d00 Michael Hanselmann
51 033a1d00 Michael Hanselmann
  #: Time after which daemon must be listening
52 033a1d00 Michael Hanselmann
  DEFAULT_LISTEN_TIMEOUT = 10
53 033a1d00 Michael Hanselmann
54 1a2e7fe9 Michael Hanselmann
  #: Progress update interval
55 1a2e7fe9 Michael Hanselmann
  DEFAULT_PROGRESS_INTERVAL = 60
56 1a2e7fe9 Michael Hanselmann
57 033a1d00 Michael Hanselmann
  __slots__ = [
58 033a1d00 Michael Hanselmann
    "error",
59 033a1d00 Michael Hanselmann
    "ready",
60 033a1d00 Michael Hanselmann
    "listen",
61 033a1d00 Michael Hanselmann
    "connect",
62 1a2e7fe9 Michael Hanselmann
    "progress",
63 033a1d00 Michael Hanselmann
    ]
64 033a1d00 Michael Hanselmann
65 033a1d00 Michael Hanselmann
  def __init__(self, connect,
66 033a1d00 Michael Hanselmann
               listen=DEFAULT_LISTEN_TIMEOUT,
67 033a1d00 Michael Hanselmann
               error=DEFAULT_ERROR_TIMEOUT,
68 1a2e7fe9 Michael Hanselmann
               ready=DEFAULT_READY_TIMEOUT,
69 1a2e7fe9 Michael Hanselmann
               progress=DEFAULT_PROGRESS_INTERVAL):
70 033a1d00 Michael Hanselmann
    """Initializes this class.
71 033a1d00 Michael Hanselmann

72 033a1d00 Michael Hanselmann
    @type connect: number
73 033a1d00 Michael Hanselmann
    @param connect: Timeout for establishing connection
74 033a1d00 Michael Hanselmann
    @type listen: number
75 033a1d00 Michael Hanselmann
    @param listen: Timeout for starting to listen for connections
76 033a1d00 Michael Hanselmann
    @type error: number
77 033a1d00 Michael Hanselmann
    @param error: Length of time until errors cause hard failure
78 033a1d00 Michael Hanselmann
    @type ready: number
79 033a1d00 Michael Hanselmann
    @param ready: Timeout for daemon to become ready
80 1a2e7fe9 Michael Hanselmann
    @type progress: number
81 1a2e7fe9 Michael Hanselmann
    @param progress: Progress update interval
82 033a1d00 Michael Hanselmann

83 033a1d00 Michael Hanselmann
    """
84 033a1d00 Michael Hanselmann
    self.error = error
85 033a1d00 Michael Hanselmann
    self.ready = ready
86 033a1d00 Michael Hanselmann
    self.listen = listen
87 033a1d00 Michael Hanselmann
    self.connect = connect
88 1a2e7fe9 Michael Hanselmann
    self.progress = progress
89 033a1d00 Michael Hanselmann
90 033a1d00 Michael Hanselmann
91 033a1d00 Michael Hanselmann
class ImportExportCbBase(object):
92 033a1d00 Michael Hanselmann
  """Callbacks for disk import/export.
93 033a1d00 Michael Hanselmann

94 033a1d00 Michael Hanselmann
  """
95 033a1d00 Michael Hanselmann
  def ReportListening(self, ie, private):
96 033a1d00 Michael Hanselmann
    """Called when daemon started listening.
97 033a1d00 Michael Hanselmann

98 033a1d00 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
99 033a1d00 Michael Hanselmann
    @param ie: Import/export object
100 033a1d00 Michael Hanselmann
    @param private: Private data passed to import/export object
101 033a1d00 Michael Hanselmann

102 033a1d00 Michael Hanselmann
    """
103 033a1d00 Michael Hanselmann
104 033a1d00 Michael Hanselmann
  def ReportConnected(self, ie, private):
105 033a1d00 Michael Hanselmann
    """Called when a connection has been established.
106 033a1d00 Michael Hanselmann

107 033a1d00 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
108 033a1d00 Michael Hanselmann
    @param ie: Import/export object
109 033a1d00 Michael Hanselmann
    @param private: Private data passed to import/export object
110 033a1d00 Michael Hanselmann

111 033a1d00 Michael Hanselmann
    """
112 033a1d00 Michael Hanselmann
113 1a2e7fe9 Michael Hanselmann
  def ReportProgress(self, ie, private):
114 1a2e7fe9 Michael Hanselmann
    """Called when new progress information should be reported.
115 1a2e7fe9 Michael Hanselmann

116 1a2e7fe9 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
117 1a2e7fe9 Michael Hanselmann
    @param ie: Import/export object
118 1a2e7fe9 Michael Hanselmann
    @param private: Private data passed to import/export object
119 1a2e7fe9 Michael Hanselmann

120 1a2e7fe9 Michael Hanselmann
    """
121 1a2e7fe9 Michael Hanselmann
122 033a1d00 Michael Hanselmann
  def ReportFinished(self, ie, private):
123 033a1d00 Michael Hanselmann
    """Called when a transfer has finished.
124 033a1d00 Michael Hanselmann

125 033a1d00 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
126 033a1d00 Michael Hanselmann
    @param ie: Import/export object
127 033a1d00 Michael Hanselmann
    @param private: Private data passed to import/export object
128 033a1d00 Michael Hanselmann

129 033a1d00 Michael Hanselmann
    """
130 033a1d00 Michael Hanselmann
131 033a1d00 Michael Hanselmann
132 033a1d00 Michael Hanselmann
def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
133 033a1d00 Michael Hanselmann
  """Checks whether a timeout has expired.
134 033a1d00 Michael Hanselmann

135 033a1d00 Michael Hanselmann
  """
136 033a1d00 Michael Hanselmann
  return _time_fn() > (epoch + timeout)
137 033a1d00 Michael Hanselmann
138 033a1d00 Michael Hanselmann
139 033a1d00 Michael Hanselmann
class _DiskImportExportBase(object):
140 033a1d00 Michael Hanselmann
  MODE_TEXT = None
141 033a1d00 Michael Hanselmann
142 eb630f50 Michael Hanselmann
  def __init__(self, lu, node_name, opts,
143 033a1d00 Michael Hanselmann
               instance, timeouts, cbs, private=None):
144 033a1d00 Michael Hanselmann
    """Initializes this class.
145 033a1d00 Michael Hanselmann

146 033a1d00 Michael Hanselmann
    @param lu: Logical unit instance
147 033a1d00 Michael Hanselmann
    @type node_name: string
148 033a1d00 Michael Hanselmann
    @param node_name: Node name for import
149 eb630f50 Michael Hanselmann
    @type opts: L{objects.ImportExportOptions}
150 eb630f50 Michael Hanselmann
    @param opts: Import/export daemon options
151 033a1d00 Michael Hanselmann
    @type instance: L{objects.Instance}
152 033a1d00 Michael Hanselmann
    @param instance: Instance object
153 033a1d00 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
154 033a1d00 Michael Hanselmann
    @param timeouts: Timeouts for this import
155 033a1d00 Michael Hanselmann
    @type cbs: L{ImportExportCbBase}
156 033a1d00 Michael Hanselmann
    @param cbs: Callbacks
157 033a1d00 Michael Hanselmann
    @param private: Private data for callback functions
158 033a1d00 Michael Hanselmann

159 033a1d00 Michael Hanselmann
    """
160 033a1d00 Michael Hanselmann
    assert self.MODE_TEXT
161 033a1d00 Michael Hanselmann
162 033a1d00 Michael Hanselmann
    self._lu = lu
163 033a1d00 Michael Hanselmann
    self.node_name = node_name
164 eb630f50 Michael Hanselmann
    self._opts = opts
165 033a1d00 Michael Hanselmann
    self._instance = instance
166 033a1d00 Michael Hanselmann
    self._timeouts = timeouts
167 033a1d00 Michael Hanselmann
    self._cbs = cbs
168 033a1d00 Michael Hanselmann
    self._private = private
169 033a1d00 Michael Hanselmann
170 033a1d00 Michael Hanselmann
    # Parent loop
171 033a1d00 Michael Hanselmann
    self._loop = None
172 033a1d00 Michael Hanselmann
173 033a1d00 Michael Hanselmann
    # Timestamps
174 033a1d00 Michael Hanselmann
    self._ts_begin = None
175 033a1d00 Michael Hanselmann
    self._ts_connected = None
176 033a1d00 Michael Hanselmann
    self._ts_finished = None
177 033a1d00 Michael Hanselmann
    self._ts_cleanup = None
178 1a2e7fe9 Michael Hanselmann
    self._ts_last_progress = None
179 033a1d00 Michael Hanselmann
    self._ts_last_error = None
180 033a1d00 Michael Hanselmann
181 033a1d00 Michael Hanselmann
    # Transfer status
182 033a1d00 Michael Hanselmann
    self.success = None
183 033a1d00 Michael Hanselmann
    self.final_message = None
184 033a1d00 Michael Hanselmann
185 033a1d00 Michael Hanselmann
    # Daemon status
186 033a1d00 Michael Hanselmann
    self._daemon_name = None
187 033a1d00 Michael Hanselmann
    self._daemon = None
188 033a1d00 Michael Hanselmann
189 033a1d00 Michael Hanselmann
  @property
190 033a1d00 Michael Hanselmann
  def recent_output(self):
191 033a1d00 Michael Hanselmann
    """Returns the most recent output from the daemon.
192 033a1d00 Michael Hanselmann

193 033a1d00 Michael Hanselmann
    """
194 033a1d00 Michael Hanselmann
    if self._daemon:
195 033a1d00 Michael Hanselmann
      return self._daemon.recent_output
196 033a1d00 Michael Hanselmann
197 033a1d00 Michael Hanselmann
    return None
198 033a1d00 Michael Hanselmann
199 033a1d00 Michael Hanselmann
  @property
200 1a2e7fe9 Michael Hanselmann
  def progress(self):
201 1a2e7fe9 Michael Hanselmann
    """Returns transfer progress information.
202 1a2e7fe9 Michael Hanselmann

203 1a2e7fe9 Michael Hanselmann
    """
204 1a2e7fe9 Michael Hanselmann
    if not self._daemon:
205 1a2e7fe9 Michael Hanselmann
      return None
206 1a2e7fe9 Michael Hanselmann
207 1a2e7fe9 Michael Hanselmann
    return (self._daemon.progress_mbytes,
208 1a2e7fe9 Michael Hanselmann
            self._daemon.progress_throughput,
209 1a2e7fe9 Michael Hanselmann
            self._daemon.progress_percent,
210 1a2e7fe9 Michael Hanselmann
            self._daemon.progress_eta)
211 1a2e7fe9 Michael Hanselmann
212 1a2e7fe9 Michael Hanselmann
  @property
213 af1d39b1 Michael Hanselmann
  def magic(self):
214 af1d39b1 Michael Hanselmann
    """Returns the magic value for this import/export.
215 af1d39b1 Michael Hanselmann

216 af1d39b1 Michael Hanselmann
    """
217 af1d39b1 Michael Hanselmann
    return self._opts.magic
218 af1d39b1 Michael Hanselmann
219 af1d39b1 Michael Hanselmann
  @property
220 033a1d00 Michael Hanselmann
  def active(self):
221 033a1d00 Michael Hanselmann
    """Determines whether this transport is still active.
222 033a1d00 Michael Hanselmann

223 033a1d00 Michael Hanselmann
    """
224 033a1d00 Michael Hanselmann
    return self.success is None
225 033a1d00 Michael Hanselmann
226 033a1d00 Michael Hanselmann
  @property
227 033a1d00 Michael Hanselmann
  def loop(self):
228 033a1d00 Michael Hanselmann
    """Returns parent loop.
229 033a1d00 Michael Hanselmann

230 033a1d00 Michael Hanselmann
    @rtype: L{ImportExportLoop}
231 033a1d00 Michael Hanselmann

232 033a1d00 Michael Hanselmann
    """
233 033a1d00 Michael Hanselmann
    return self._loop
234 033a1d00 Michael Hanselmann
235 033a1d00 Michael Hanselmann
  def SetLoop(self, loop):
236 033a1d00 Michael Hanselmann
    """Sets the parent loop.
237 033a1d00 Michael Hanselmann

238 033a1d00 Michael Hanselmann
    @type loop: L{ImportExportLoop}
239 033a1d00 Michael Hanselmann

240 033a1d00 Michael Hanselmann
    """
241 033a1d00 Michael Hanselmann
    if self._loop:
242 033a1d00 Michael Hanselmann
      raise errors.ProgrammerError("Loop can only be set once")
243 033a1d00 Michael Hanselmann
244 033a1d00 Michael Hanselmann
    self._loop = loop
245 033a1d00 Michael Hanselmann
246 033a1d00 Michael Hanselmann
  def _StartDaemon(self):
247 033a1d00 Michael Hanselmann
    """Starts the import/export daemon.
248 033a1d00 Michael Hanselmann

249 033a1d00 Michael Hanselmann
    """
250 033a1d00 Michael Hanselmann
    raise NotImplementedError()
251 033a1d00 Michael Hanselmann
252 033a1d00 Michael Hanselmann
  def CheckDaemon(self):
253 033a1d00 Michael Hanselmann
    """Checks whether daemon has been started and if not, starts it.
254 033a1d00 Michael Hanselmann

255 033a1d00 Michael Hanselmann
    @rtype: string
256 033a1d00 Michael Hanselmann
    @return: Daemon name
257 033a1d00 Michael Hanselmann

258 033a1d00 Michael Hanselmann
    """
259 033a1d00 Michael Hanselmann
    assert self._ts_cleanup is None
260 033a1d00 Michael Hanselmann
261 033a1d00 Michael Hanselmann
    if self._daemon_name is None:
262 033a1d00 Michael Hanselmann
      assert self._ts_begin is None
263 033a1d00 Michael Hanselmann
264 033a1d00 Michael Hanselmann
      result = self._StartDaemon()
265 033a1d00 Michael Hanselmann
      if result.fail_msg:
266 033a1d00 Michael Hanselmann
        raise _ImportExportError("Failed to start %s on %s: %s" %
267 033a1d00 Michael Hanselmann
                                 (self.MODE_TEXT, self.node_name,
268 033a1d00 Michael Hanselmann
                                  result.fail_msg))
269 033a1d00 Michael Hanselmann
270 033a1d00 Michael Hanselmann
      daemon_name = result.payload
271 033a1d00 Michael Hanselmann
272 033a1d00 Michael Hanselmann
      logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name,
273 033a1d00 Michael Hanselmann
                   self.node_name)
274 033a1d00 Michael Hanselmann
275 033a1d00 Michael Hanselmann
      self._ts_begin = time.time()
276 033a1d00 Michael Hanselmann
      self._daemon_name = daemon_name
277 033a1d00 Michael Hanselmann
278 033a1d00 Michael Hanselmann
    return self._daemon_name
279 033a1d00 Michael Hanselmann
280 033a1d00 Michael Hanselmann
  def GetDaemonName(self):
281 033a1d00 Michael Hanselmann
    """Returns the daemon name.
282 033a1d00 Michael Hanselmann

283 033a1d00 Michael Hanselmann
    """
284 033a1d00 Michael Hanselmann
    assert self._daemon_name, "Daemon has not been started"
285 033a1d00 Michael Hanselmann
    assert self._ts_cleanup is None
286 033a1d00 Michael Hanselmann
    return self._daemon_name
287 033a1d00 Michael Hanselmann
288 033a1d00 Michael Hanselmann
  def Abort(self):
289 033a1d00 Michael Hanselmann
    """Sends SIGTERM to import/export daemon (if still active).
290 033a1d00 Michael Hanselmann

291 033a1d00 Michael Hanselmann
    """
292 033a1d00 Michael Hanselmann
    if self._daemon_name:
293 033a1d00 Michael Hanselmann
      self._lu.LogWarning("Aborting %s %r on %s",
294 033a1d00 Michael Hanselmann
                          self.MODE_TEXT, self._daemon_name, self.node_name)
295 033a1d00 Michael Hanselmann
      result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
296 033a1d00 Michael Hanselmann
      if result.fail_msg:
297 033a1d00 Michael Hanselmann
        self._lu.LogWarning("Failed to abort %s %r on %s: %s",
298 033a1d00 Michael Hanselmann
                            self.MODE_TEXT, self._daemon_name,
299 033a1d00 Michael Hanselmann
                            self.node_name, result.fail_msg)
300 033a1d00 Michael Hanselmann
        return False
301 033a1d00 Michael Hanselmann
302 033a1d00 Michael Hanselmann
    return True
303 033a1d00 Michael Hanselmann
304 033a1d00 Michael Hanselmann
  def _SetDaemonData(self, data):
305 033a1d00 Michael Hanselmann
    """Internal function for updating status daemon data.
306 033a1d00 Michael Hanselmann

307 033a1d00 Michael Hanselmann
    @type data: L{objects.ImportExportStatus}
308 033a1d00 Michael Hanselmann
    @param data: Daemon status data
309 033a1d00 Michael Hanselmann

310 033a1d00 Michael Hanselmann
    """
311 033a1d00 Michael Hanselmann
    assert self._ts_begin is not None
312 033a1d00 Michael Hanselmann
313 033a1d00 Michael Hanselmann
    if not data:
314 033a1d00 Michael Hanselmann
      if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
315 033a1d00 Michael Hanselmann
        raise _ImportExportError("Didn't become ready after %s seconds" %
316 033a1d00 Michael Hanselmann
                                 self._timeouts.ready)
317 033a1d00 Michael Hanselmann
318 033a1d00 Michael Hanselmann
      return False
319 033a1d00 Michael Hanselmann
320 033a1d00 Michael Hanselmann
    self._daemon = data
321 033a1d00 Michael Hanselmann
322 033a1d00 Michael Hanselmann
    return True
323 033a1d00 Michael Hanselmann
324 033a1d00 Michael Hanselmann
  def SetDaemonData(self, success, data):
325 033a1d00 Michael Hanselmann
    """Updates daemon status data.
326 033a1d00 Michael Hanselmann

327 033a1d00 Michael Hanselmann
    @type success: bool
328 033a1d00 Michael Hanselmann
    @param success: Whether fetching data was successful or not
329 033a1d00 Michael Hanselmann
    @type data: L{objects.ImportExportStatus}
330 033a1d00 Michael Hanselmann
    @param data: Daemon status data
331 033a1d00 Michael Hanselmann

332 033a1d00 Michael Hanselmann
    """
333 033a1d00 Michael Hanselmann
    if not success:
334 033a1d00 Michael Hanselmann
      if self._ts_last_error is None:
335 033a1d00 Michael Hanselmann
        self._ts_last_error = time.time()
336 033a1d00 Michael Hanselmann
337 033a1d00 Michael Hanselmann
      elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
338 033a1d00 Michael Hanselmann
        raise _ImportExportError("Too many errors while updating data")
339 033a1d00 Michael Hanselmann
340 033a1d00 Michael Hanselmann
      return False
341 033a1d00 Michael Hanselmann
342 033a1d00 Michael Hanselmann
    self._ts_last_error = None
343 033a1d00 Michael Hanselmann
344 033a1d00 Michael Hanselmann
    return self._SetDaemonData(data)
345 033a1d00 Michael Hanselmann
346 033a1d00 Michael Hanselmann
  def CheckListening(self):
347 033a1d00 Michael Hanselmann
    """Checks whether the daemon is listening.
348 033a1d00 Michael Hanselmann

349 033a1d00 Michael Hanselmann
    """
350 033a1d00 Michael Hanselmann
    raise NotImplementedError()
351 033a1d00 Michael Hanselmann
352 033a1d00 Michael Hanselmann
  def _GetConnectedCheckEpoch(self):
353 033a1d00 Michael Hanselmann
    """Returns timeout to calculate connect timeout.
354 033a1d00 Michael Hanselmann

355 033a1d00 Michael Hanselmann
    """
356 033a1d00 Michael Hanselmann
    raise NotImplementedError()
357 033a1d00 Michael Hanselmann
358 033a1d00 Michael Hanselmann
  def CheckConnected(self):
359 033a1d00 Michael Hanselmann
    """Checks whether the daemon is connected.
360 033a1d00 Michael Hanselmann

361 033a1d00 Michael Hanselmann
    @rtype: bool
362 033a1d00 Michael Hanselmann
    @return: Whether the daemon is connected
363 033a1d00 Michael Hanselmann

364 033a1d00 Michael Hanselmann
    """
365 033a1d00 Michael Hanselmann
    assert self._daemon, "Daemon status missing"
366 033a1d00 Michael Hanselmann
367 033a1d00 Michael Hanselmann
    if self._ts_connected is not None:
368 033a1d00 Michael Hanselmann
      return True
369 033a1d00 Michael Hanselmann
370 033a1d00 Michael Hanselmann
    if self._daemon.connected:
371 033a1d00 Michael Hanselmann
      self._ts_connected = time.time()
372 033a1d00 Michael Hanselmann
373 033a1d00 Michael Hanselmann
      # TODO: Log remote peer
374 033a1d00 Michael Hanselmann
      logging.debug("%s %r on %s is now connected",
375 033a1d00 Michael Hanselmann
                    self.MODE_TEXT, self._daemon_name, self.node_name)
376 033a1d00 Michael Hanselmann
377 033a1d00 Michael Hanselmann
      self._cbs.ReportConnected(self, self._private)
378 033a1d00 Michael Hanselmann
379 033a1d00 Michael Hanselmann
      return True
380 033a1d00 Michael Hanselmann
381 033a1d00 Michael Hanselmann
    if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
382 033a1d00 Michael Hanselmann
      raise _ImportExportError("Not connected after %s seconds" %
383 033a1d00 Michael Hanselmann
                               self._timeouts.connect)
384 033a1d00 Michael Hanselmann
385 033a1d00 Michael Hanselmann
    return False
386 033a1d00 Michael Hanselmann
387 1a2e7fe9 Michael Hanselmann
  def _CheckProgress(self):
388 1a2e7fe9 Michael Hanselmann
    """Checks whether a progress update should be reported.
389 1a2e7fe9 Michael Hanselmann

390 1a2e7fe9 Michael Hanselmann
    """
391 1a2e7fe9 Michael Hanselmann
    if ((self._ts_last_progress is None or
392 1a2e7fe9 Michael Hanselmann
         _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
393 1a2e7fe9 Michael Hanselmann
        self._daemon and
394 1a2e7fe9 Michael Hanselmann
        self._daemon.progress_mbytes is not None and
395 1a2e7fe9 Michael Hanselmann
        self._daemon.progress_throughput is not None):
396 1a2e7fe9 Michael Hanselmann
      self._cbs.ReportProgress(self, self._private)
397 1a2e7fe9 Michael Hanselmann
      self._ts_last_progress = time.time()
398 1a2e7fe9 Michael Hanselmann
399 033a1d00 Michael Hanselmann
  def CheckFinished(self):
400 033a1d00 Michael Hanselmann
    """Checks whether the daemon exited.
401 033a1d00 Michael Hanselmann

402 033a1d00 Michael Hanselmann
    @rtype: bool
403 033a1d00 Michael Hanselmann
    @return: Whether the transfer is finished
404 033a1d00 Michael Hanselmann

405 033a1d00 Michael Hanselmann
    """
406 033a1d00 Michael Hanselmann
    assert self._daemon, "Daemon status missing"
407 033a1d00 Michael Hanselmann
408 033a1d00 Michael Hanselmann
    if self._ts_finished:
409 033a1d00 Michael Hanselmann
      return True
410 033a1d00 Michael Hanselmann
411 033a1d00 Michael Hanselmann
    if self._daemon.exit_status is None:
412 1a2e7fe9 Michael Hanselmann
      # TODO: Adjust delay for ETA expiring soon
413 1a2e7fe9 Michael Hanselmann
      self._CheckProgress()
414 033a1d00 Michael Hanselmann
      return False
415 033a1d00 Michael Hanselmann
416 033a1d00 Michael Hanselmann
    self._ts_finished = time.time()
417 033a1d00 Michael Hanselmann
418 033a1d00 Michael Hanselmann
    self._ReportFinished(self._daemon.exit_status == 0,
419 033a1d00 Michael Hanselmann
                         self._daemon.error_message)
420 033a1d00 Michael Hanselmann
421 033a1d00 Michael Hanselmann
    return True
422 033a1d00 Michael Hanselmann
423 033a1d00 Michael Hanselmann
  def _ReportFinished(self, success, message):
424 033a1d00 Michael Hanselmann
    """Transfer is finished or daemon exited.
425 033a1d00 Michael Hanselmann

426 033a1d00 Michael Hanselmann
    @type success: bool
427 033a1d00 Michael Hanselmann
    @param success: Whether the transfer was successful
428 033a1d00 Michael Hanselmann
    @type message: string
429 033a1d00 Michael Hanselmann
    @param message: Error message
430 033a1d00 Michael Hanselmann

431 033a1d00 Michael Hanselmann
    """
432 033a1d00 Michael Hanselmann
    assert self.success is None
433 033a1d00 Michael Hanselmann
434 033a1d00 Michael Hanselmann
    self.success = success
435 033a1d00 Michael Hanselmann
    self.final_message = message
436 033a1d00 Michael Hanselmann
437 033a1d00 Michael Hanselmann
    if success:
438 033a1d00 Michael Hanselmann
      logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name,
439 033a1d00 Michael Hanselmann
                   self.node_name)
440 033a1d00 Michael Hanselmann
    elif self._daemon_name:
441 033a1d00 Michael Hanselmann
      self._lu.LogWarning("%s %r on %s failed: %s",
442 033a1d00 Michael Hanselmann
                          self.MODE_TEXT, self._daemon_name, self.node_name,
443 033a1d00 Michael Hanselmann
                          message)
444 033a1d00 Michael Hanselmann
    else:
445 033a1d00 Michael Hanselmann
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
446 033a1d00 Michael Hanselmann
                          self.node_name, message)
447 033a1d00 Michael Hanselmann
448 033a1d00 Michael Hanselmann
    self._cbs.ReportFinished(self, self._private)
449 033a1d00 Michael Hanselmann
450 033a1d00 Michael Hanselmann
  def _Finalize(self):
451 033a1d00 Michael Hanselmann
    """Makes the RPC call to finalize this import/export.
452 033a1d00 Michael Hanselmann

453 033a1d00 Michael Hanselmann
    """
454 033a1d00 Michael Hanselmann
    return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
455 033a1d00 Michael Hanselmann
456 033a1d00 Michael Hanselmann
  def Finalize(self, error=None):
457 033a1d00 Michael Hanselmann
    """Finalizes this import/export.
458 033a1d00 Michael Hanselmann

459 033a1d00 Michael Hanselmann
    """
460 033a1d00 Michael Hanselmann
    if self._daemon_name:
461 033a1d00 Michael Hanselmann
      logging.info("Finalizing %s %r on %s",
462 033a1d00 Michael Hanselmann
                   self.MODE_TEXT, self._daemon_name, self.node_name)
463 033a1d00 Michael Hanselmann
464 033a1d00 Michael Hanselmann
      result = self._Finalize()
465 033a1d00 Michael Hanselmann
      if result.fail_msg:
466 033a1d00 Michael Hanselmann
        self._lu.LogWarning("Failed to finalize %s %r on %s: %s",
467 033a1d00 Michael Hanselmann
                            self.MODE_TEXT, self._daemon_name,
468 033a1d00 Michael Hanselmann
                            self.node_name, result.fail_msg)
469 033a1d00 Michael Hanselmann
        return False
470 033a1d00 Michael Hanselmann
471 033a1d00 Michael Hanselmann
      # Daemon is no longer running
472 033a1d00 Michael Hanselmann
      self._daemon_name = None
473 033a1d00 Michael Hanselmann
      self._ts_cleanup = time.time()
474 033a1d00 Michael Hanselmann
475 033a1d00 Michael Hanselmann
    if error:
476 033a1d00 Michael Hanselmann
      self._ReportFinished(False, error)
477 033a1d00 Michael Hanselmann
478 033a1d00 Michael Hanselmann
    return True
479 033a1d00 Michael Hanselmann
480 033a1d00 Michael Hanselmann
481 033a1d00 Michael Hanselmann
class DiskImport(_DiskImportExportBase):
482 033a1d00 Michael Hanselmann
  MODE_TEXT = "import"
483 033a1d00 Michael Hanselmann
484 eb630f50 Michael Hanselmann
  def __init__(self, lu, node_name, opts, instance,
485 033a1d00 Michael Hanselmann
               dest, dest_args, timeouts, cbs, private=None):
486 033a1d00 Michael Hanselmann
    """Initializes this class.
487 033a1d00 Michael Hanselmann

488 033a1d00 Michael Hanselmann
    @param lu: Logical unit instance
489 033a1d00 Michael Hanselmann
    @type node_name: string
490 033a1d00 Michael Hanselmann
    @param node_name: Node name for import
491 eb630f50 Michael Hanselmann
    @type opts: L{objects.ImportExportOptions}
492 eb630f50 Michael Hanselmann
    @param opts: Import/export daemon options
493 033a1d00 Michael Hanselmann
    @type instance: L{objects.Instance}
494 033a1d00 Michael Hanselmann
    @param instance: Instance object
495 033a1d00 Michael Hanselmann
    @param dest: I/O destination
496 033a1d00 Michael Hanselmann
    @param dest_args: I/O arguments
497 033a1d00 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
498 033a1d00 Michael Hanselmann
    @param timeouts: Timeouts for this import
499 033a1d00 Michael Hanselmann
    @type cbs: L{ImportExportCbBase}
500 033a1d00 Michael Hanselmann
    @param cbs: Callbacks
501 033a1d00 Michael Hanselmann
    @param private: Private data for callback functions
502 033a1d00 Michael Hanselmann

503 033a1d00 Michael Hanselmann
    """
504 eb630f50 Michael Hanselmann
    _DiskImportExportBase.__init__(self, lu, node_name, opts,
505 033a1d00 Michael Hanselmann
                                   instance, timeouts, cbs, private)
506 033a1d00 Michael Hanselmann
    self._dest = dest
507 033a1d00 Michael Hanselmann
    self._dest_args = dest_args
508 033a1d00 Michael Hanselmann
509 033a1d00 Michael Hanselmann
    # Timestamps
510 033a1d00 Michael Hanselmann
    self._ts_listening = None
511 033a1d00 Michael Hanselmann
512 033a1d00 Michael Hanselmann
  @property
513 033a1d00 Michael Hanselmann
  def listen_port(self):
514 033a1d00 Michael Hanselmann
    """Returns the port the daemon is listening on.
515 033a1d00 Michael Hanselmann

516 033a1d00 Michael Hanselmann
    """
517 033a1d00 Michael Hanselmann
    if self._daemon:
518 033a1d00 Michael Hanselmann
      return self._daemon.listen_port
519 033a1d00 Michael Hanselmann
520 033a1d00 Michael Hanselmann
    return None
521 033a1d00 Michael Hanselmann
522 033a1d00 Michael Hanselmann
  def _StartDaemon(self):
523 033a1d00 Michael Hanselmann
    """Starts the import daemon.
524 033a1d00 Michael Hanselmann

525 033a1d00 Michael Hanselmann
    """
526 eb630f50 Michael Hanselmann
    return self._lu.rpc.call_import_start(self.node_name, self._opts,
527 eb630f50 Michael Hanselmann
                                          self._instance,
528 033a1d00 Michael Hanselmann
                                          self._dest, self._dest_args)
529 033a1d00 Michael Hanselmann
530 033a1d00 Michael Hanselmann
  def CheckListening(self):
531 033a1d00 Michael Hanselmann
    """Checks whether the daemon is listening.
532 033a1d00 Michael Hanselmann

533 033a1d00 Michael Hanselmann
    @rtype: bool
534 033a1d00 Michael Hanselmann
    @return: Whether the daemon is listening
535 033a1d00 Michael Hanselmann

536 033a1d00 Michael Hanselmann
    """
537 033a1d00 Michael Hanselmann
    assert self._daemon, "Daemon status missing"
538 033a1d00 Michael Hanselmann
539 033a1d00 Michael Hanselmann
    if self._ts_listening is not None:
540 033a1d00 Michael Hanselmann
      return True
541 033a1d00 Michael Hanselmann
542 033a1d00 Michael Hanselmann
    port = self._daemon.listen_port
543 033a1d00 Michael Hanselmann
    if port is not None:
544 033a1d00 Michael Hanselmann
      self._ts_listening = time.time()
545 033a1d00 Michael Hanselmann
546 033a1d00 Michael Hanselmann
      logging.debug("Import %r on %s is now listening on port %s",
547 033a1d00 Michael Hanselmann
                    self._daemon_name, self.node_name, port)
548 033a1d00 Michael Hanselmann
549 033a1d00 Michael Hanselmann
      self._cbs.ReportListening(self, self._private)
550 033a1d00 Michael Hanselmann
551 033a1d00 Michael Hanselmann
      return True
552 033a1d00 Michael Hanselmann
553 033a1d00 Michael Hanselmann
    if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
554 033a1d00 Michael Hanselmann
      raise _ImportExportError("Not listening after %s seconds" %
555 033a1d00 Michael Hanselmann
                               self._timeouts.listen)
556 033a1d00 Michael Hanselmann
557 033a1d00 Michael Hanselmann
    return False
558 033a1d00 Michael Hanselmann
559 033a1d00 Michael Hanselmann
  def _GetConnectedCheckEpoch(self):
560 033a1d00 Michael Hanselmann
    """Returns the time since we started listening.
561 033a1d00 Michael Hanselmann

562 033a1d00 Michael Hanselmann
    """
563 033a1d00 Michael Hanselmann
    assert self._ts_listening is not None, \
564 033a1d00 Michael Hanselmann
           ("Checking whether an import is connected is only useful"
565 033a1d00 Michael Hanselmann
            " once it's been listening")
566 033a1d00 Michael Hanselmann
567 033a1d00 Michael Hanselmann
    return self._ts_listening
568 033a1d00 Michael Hanselmann
569 033a1d00 Michael Hanselmann
570 033a1d00 Michael Hanselmann
class DiskExport(_DiskImportExportBase):
571 033a1d00 Michael Hanselmann
  MODE_TEXT = "export"
572 033a1d00 Michael Hanselmann
573 eb630f50 Michael Hanselmann
  def __init__(self, lu, node_name, opts,
574 033a1d00 Michael Hanselmann
               dest_host, dest_port, instance, source, source_args,
575 033a1d00 Michael Hanselmann
               timeouts, cbs, private=None):
576 033a1d00 Michael Hanselmann
    """Initializes this class.
577 033a1d00 Michael Hanselmann

578 033a1d00 Michael Hanselmann
    @param lu: Logical unit instance
579 033a1d00 Michael Hanselmann
    @type node_name: string
580 033a1d00 Michael Hanselmann
    @param node_name: Node name for import
581 eb630f50 Michael Hanselmann
    @type opts: L{objects.ImportExportOptions}
582 eb630f50 Michael Hanselmann
    @param opts: Import/export daemon options
583 033a1d00 Michael Hanselmann
    @type dest_host: string
584 033a1d00 Michael Hanselmann
    @param dest_host: Destination host name or IP address
585 033a1d00 Michael Hanselmann
    @type dest_port: number
586 033a1d00 Michael Hanselmann
    @param dest_port: Destination port number
587 033a1d00 Michael Hanselmann
    @type instance: L{objects.Instance}
588 033a1d00 Michael Hanselmann
    @param instance: Instance object
589 033a1d00 Michael Hanselmann
    @param source: I/O source
590 033a1d00 Michael Hanselmann
    @param source_args: I/O source
591 033a1d00 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
592 033a1d00 Michael Hanselmann
    @param timeouts: Timeouts for this import
593 033a1d00 Michael Hanselmann
    @type cbs: L{ImportExportCbBase}
594 033a1d00 Michael Hanselmann
    @param cbs: Callbacks
595 033a1d00 Michael Hanselmann
    @param private: Private data for callback functions
596 033a1d00 Michael Hanselmann

597 033a1d00 Michael Hanselmann
    """
598 eb630f50 Michael Hanselmann
    _DiskImportExportBase.__init__(self, lu, node_name, opts,
599 033a1d00 Michael Hanselmann
                                   instance, timeouts, cbs, private)
600 033a1d00 Michael Hanselmann
    self._dest_host = dest_host
601 033a1d00 Michael Hanselmann
    self._dest_port = dest_port
602 033a1d00 Michael Hanselmann
    self._source = source
603 033a1d00 Michael Hanselmann
    self._source_args = source_args
604 033a1d00 Michael Hanselmann
605 033a1d00 Michael Hanselmann
  def _StartDaemon(self):
606 033a1d00 Michael Hanselmann
    """Starts the export daemon.
607 033a1d00 Michael Hanselmann

608 033a1d00 Michael Hanselmann
    """
609 eb630f50 Michael Hanselmann
    return self._lu.rpc.call_export_start(self.node_name, self._opts,
610 033a1d00 Michael Hanselmann
                                          self._dest_host, self._dest_port,
611 033a1d00 Michael Hanselmann
                                          self._instance, self._source,
612 033a1d00 Michael Hanselmann
                                          self._source_args)
613 033a1d00 Michael Hanselmann
614 033a1d00 Michael Hanselmann
  def CheckListening(self):
615 033a1d00 Michael Hanselmann
    """Checks whether the daemon is listening.
616 033a1d00 Michael Hanselmann

617 033a1d00 Michael Hanselmann
    """
618 033a1d00 Michael Hanselmann
    # Only an import can be listening
619 033a1d00 Michael Hanselmann
    return True
620 033a1d00 Michael Hanselmann
621 033a1d00 Michael Hanselmann
  def _GetConnectedCheckEpoch(self):
622 033a1d00 Michael Hanselmann
    """Returns the time since the daemon started.
623 033a1d00 Michael Hanselmann

624 033a1d00 Michael Hanselmann
    """
625 033a1d00 Michael Hanselmann
    assert self._ts_begin is not None
626 033a1d00 Michael Hanselmann
627 033a1d00 Michael Hanselmann
    return self._ts_begin
628 033a1d00 Michael Hanselmann
629 033a1d00 Michael Hanselmann
630 1a2e7fe9 Michael Hanselmann
def FormatProgress(progress):
631 1a2e7fe9 Michael Hanselmann
  """Formats progress information for user consumption
632 1a2e7fe9 Michael Hanselmann

633 1a2e7fe9 Michael Hanselmann
  """
634 e6b8d02d Michael Hanselmann
  (mbytes, throughput, percent, eta) = progress
635 1a2e7fe9 Michael Hanselmann
636 1a2e7fe9 Michael Hanselmann
  parts = [
637 1a2e7fe9 Michael Hanselmann
    utils.FormatUnit(mbytes, "h"),
638 1a2e7fe9 Michael Hanselmann
639 1a2e7fe9 Michael Hanselmann
    # Not using FormatUnit as it doesn't support kilobytes
640 1a2e7fe9 Michael Hanselmann
    "%0.1f MiB/s" % throughput,
641 1a2e7fe9 Michael Hanselmann
    ]
642 1a2e7fe9 Michael Hanselmann
643 1a2e7fe9 Michael Hanselmann
  if percent is not None:
644 1a2e7fe9 Michael Hanselmann
    parts.append("%d%%" % percent)
645 1a2e7fe9 Michael Hanselmann
646 e6b8d02d Michael Hanselmann
  if eta is not None:
647 e6b8d02d Michael Hanselmann
    parts.append("ETA %s" % utils.FormatSeconds(eta))
648 1a2e7fe9 Michael Hanselmann
649 1a2e7fe9 Michael Hanselmann
  return utils.CommaJoin(parts)
650 1a2e7fe9 Michael Hanselmann
651 1a2e7fe9 Michael Hanselmann
652 033a1d00 Michael Hanselmann
class ImportExportLoop:
653 033a1d00 Michael Hanselmann
  MIN_DELAY = 1.0
654 033a1d00 Michael Hanselmann
  MAX_DELAY = 20.0
655 033a1d00 Michael Hanselmann
656 033a1d00 Michael Hanselmann
  def __init__(self, lu):
657 033a1d00 Michael Hanselmann
    """Initializes this class.
658 033a1d00 Michael Hanselmann

659 033a1d00 Michael Hanselmann
    """
660 033a1d00 Michael Hanselmann
    self._lu = lu
661 033a1d00 Michael Hanselmann
    self._queue = []
662 033a1d00 Michael Hanselmann
    self._pending_add = []
663 033a1d00 Michael Hanselmann
664 033a1d00 Michael Hanselmann
  def Add(self, diskie):
665 033a1d00 Michael Hanselmann
    """Adds an import/export object to the loop.
666 033a1d00 Michael Hanselmann

667 033a1d00 Michael Hanselmann
    @type diskie: Subclass of L{_DiskImportExportBase}
668 033a1d00 Michael Hanselmann
    @param diskie: Import/export object
669 033a1d00 Michael Hanselmann

670 033a1d00 Michael Hanselmann
    """
671 033a1d00 Michael Hanselmann
    assert diskie not in self._pending_add
672 033a1d00 Michael Hanselmann
    assert diskie.loop is None
673 033a1d00 Michael Hanselmann
674 033a1d00 Michael Hanselmann
    diskie.SetLoop(self)
675 033a1d00 Michael Hanselmann
676 033a1d00 Michael Hanselmann
    # Adding new objects to a staging list is necessary, otherwise the main
677 033a1d00 Michael Hanselmann
    # loop gets confused if callbacks modify the queue while the main loop is
678 033a1d00 Michael Hanselmann
    # iterating over it.
679 033a1d00 Michael Hanselmann
    self._pending_add.append(diskie)
680 033a1d00 Michael Hanselmann
681 033a1d00 Michael Hanselmann
  @staticmethod
682 033a1d00 Michael Hanselmann
  def _CollectDaemonStatus(lu, daemons):
683 033a1d00 Michael Hanselmann
    """Collects the status for all import/export daemons.
684 033a1d00 Michael Hanselmann

685 033a1d00 Michael Hanselmann
    """
686 033a1d00 Michael Hanselmann
    daemon_status = {}
687 033a1d00 Michael Hanselmann
688 033a1d00 Michael Hanselmann
    for node_name, names in daemons.iteritems():
689 033a1d00 Michael Hanselmann
      result = lu.rpc.call_impexp_status(node_name, names)
690 033a1d00 Michael Hanselmann
      if result.fail_msg:
691 033a1d00 Michael Hanselmann
        lu.LogWarning("Failed to get daemon status on %s: %s",
692 033a1d00 Michael Hanselmann
                      node_name, result.fail_msg)
693 033a1d00 Michael Hanselmann
        continue
694 033a1d00 Michael Hanselmann
695 033a1d00 Michael Hanselmann
      assert len(names) == len(result.payload)
696 033a1d00 Michael Hanselmann
697 033a1d00 Michael Hanselmann
      daemon_status[node_name] = dict(zip(names, result.payload))
698 033a1d00 Michael Hanselmann
699 033a1d00 Michael Hanselmann
    return daemon_status
700 033a1d00 Michael Hanselmann
701 033a1d00 Michael Hanselmann
  @staticmethod
702 033a1d00 Michael Hanselmann
  def _GetActiveDaemonNames(queue):
703 033a1d00 Michael Hanselmann
    """Gets the names of all active daemons.
704 033a1d00 Michael Hanselmann

705 033a1d00 Michael Hanselmann
    """
706 033a1d00 Michael Hanselmann
    result = {}
707 033a1d00 Michael Hanselmann
    for diskie in queue:
708 033a1d00 Michael Hanselmann
      if not diskie.active:
709 033a1d00 Michael Hanselmann
        continue
710 033a1d00 Michael Hanselmann
711 033a1d00 Michael Hanselmann
      try:
712 033a1d00 Michael Hanselmann
        # Start daemon if necessary
713 033a1d00 Michael Hanselmann
        daemon_name = diskie.CheckDaemon()
714 033a1d00 Michael Hanselmann
      except _ImportExportError, err:
715 033a1d00 Michael Hanselmann
        logging.exception("%s failed", diskie.MODE_TEXT)
716 033a1d00 Michael Hanselmann
        diskie.Finalize(error=str(err))
717 033a1d00 Michael Hanselmann
        continue
718 033a1d00 Michael Hanselmann
719 033a1d00 Michael Hanselmann
      result.setdefault(diskie.node_name, []).append(daemon_name)
720 033a1d00 Michael Hanselmann
721 033a1d00 Michael Hanselmann
    assert len(queue) >= len(result)
722 033a1d00 Michael Hanselmann
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
723 033a1d00 Michael Hanselmann
724 033a1d00 Michael Hanselmann
    logging.debug("daemons=%r", result)
725 033a1d00 Michael Hanselmann
726 033a1d00 Michael Hanselmann
    return result
727 033a1d00 Michael Hanselmann
728 033a1d00 Michael Hanselmann
  def _AddPendingToQueue(self):
729 033a1d00 Michael Hanselmann
    """Adds all pending import/export objects to the internal queue.
730 033a1d00 Michael Hanselmann

731 033a1d00 Michael Hanselmann
    """
732 033a1d00 Michael Hanselmann
    assert compat.all(diskie not in self._queue and diskie.loop == self
733 033a1d00 Michael Hanselmann
                      for diskie in self._pending_add)
734 033a1d00 Michael Hanselmann
735 033a1d00 Michael Hanselmann
    self._queue.extend(self._pending_add)
736 033a1d00 Michael Hanselmann
737 033a1d00 Michael Hanselmann
    del self._pending_add[:]
738 033a1d00 Michael Hanselmann
739 033a1d00 Michael Hanselmann
  def Run(self):
740 033a1d00 Michael Hanselmann
    """Utility main loop.
741 033a1d00 Michael Hanselmann

742 033a1d00 Michael Hanselmann
    """
743 033a1d00 Michael Hanselmann
    while True:
744 033a1d00 Michael Hanselmann
      self._AddPendingToQueue()
745 033a1d00 Michael Hanselmann
746 033a1d00 Michael Hanselmann
      # Collect all active daemon names
747 033a1d00 Michael Hanselmann
      daemons = self._GetActiveDaemonNames(self._queue)
748 033a1d00 Michael Hanselmann
      if not daemons:
749 033a1d00 Michael Hanselmann
        break
750 033a1d00 Michael Hanselmann
751 033a1d00 Michael Hanselmann
      # Collection daemon status data
752 033a1d00 Michael Hanselmann
      data = self._CollectDaemonStatus(self._lu, daemons)
753 033a1d00 Michael Hanselmann
754 033a1d00 Michael Hanselmann
      # Use data
755 033a1d00 Michael Hanselmann
      delay = self.MAX_DELAY
756 033a1d00 Michael Hanselmann
      for diskie in self._queue:
757 033a1d00 Michael Hanselmann
        if not diskie.active:
758 033a1d00 Michael Hanselmann
          continue
759 033a1d00 Michael Hanselmann
760 033a1d00 Michael Hanselmann
        try:
761 033a1d00 Michael Hanselmann
          try:
762 033a1d00 Michael Hanselmann
            all_daemon_data = data[diskie.node_name]
763 033a1d00 Michael Hanselmann
          except KeyError:
764 033a1d00 Michael Hanselmann
            result = diskie.SetDaemonData(False, None)
765 033a1d00 Michael Hanselmann
          else:
766 033a1d00 Michael Hanselmann
            result = \
767 033a1d00 Michael Hanselmann
              diskie.SetDaemonData(True,
768 033a1d00 Michael Hanselmann
                                   all_daemon_data[diskie.GetDaemonName()])
769 033a1d00 Michael Hanselmann
770 033a1d00 Michael Hanselmann
          if not result:
771 033a1d00 Michael Hanselmann
            # Daemon not yet ready, retry soon
772 033a1d00 Michael Hanselmann
            delay = min(3.0, delay)
773 033a1d00 Michael Hanselmann
            continue
774 033a1d00 Michael Hanselmann
775 033a1d00 Michael Hanselmann
          if diskie.CheckFinished():
776 033a1d00 Michael Hanselmann
            # Transfer finished
777 033a1d00 Michael Hanselmann
            diskie.Finalize()
778 033a1d00 Michael Hanselmann
            continue
779 033a1d00 Michael Hanselmann
780 033a1d00 Michael Hanselmann
          # Normal case: check again in 5 seconds
781 033a1d00 Michael Hanselmann
          delay = min(5.0, delay)
782 033a1d00 Michael Hanselmann
783 033a1d00 Michael Hanselmann
          if not diskie.CheckListening():
784 033a1d00 Michael Hanselmann
            # Not yet listening, retry soon
785 033a1d00 Michael Hanselmann
            delay = min(1.0, delay)
786 033a1d00 Michael Hanselmann
            continue
787 033a1d00 Michael Hanselmann
788 033a1d00 Michael Hanselmann
          if not diskie.CheckConnected():
789 033a1d00 Michael Hanselmann
            # Not yet connected, retry soon
790 033a1d00 Michael Hanselmann
            delay = min(1.0, delay)
791 033a1d00 Michael Hanselmann
            continue
792 033a1d00 Michael Hanselmann
793 033a1d00 Michael Hanselmann
        except _ImportExportError, err:
794 033a1d00 Michael Hanselmann
          logging.exception("%s failed", diskie.MODE_TEXT)
795 033a1d00 Michael Hanselmann
          diskie.Finalize(error=str(err))
796 033a1d00 Michael Hanselmann
797 403f5172 Guido Trotter
      if not compat.any(diskie.active for diskie in self._queue):
798 033a1d00 Michael Hanselmann
        break
799 033a1d00 Michael Hanselmann
800 033a1d00 Michael Hanselmann
      # Wait a bit
801 033a1d00 Michael Hanselmann
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
802 033a1d00 Michael Hanselmann
      logging.debug("Waiting for %ss", delay)
803 033a1d00 Michael Hanselmann
      time.sleep(delay)
804 033a1d00 Michael Hanselmann
805 033a1d00 Michael Hanselmann
  def FinalizeAll(self):
806 033a1d00 Michael Hanselmann
    """Finalizes all pending transfers.
807 033a1d00 Michael Hanselmann

808 033a1d00 Michael Hanselmann
    """
809 033a1d00 Michael Hanselmann
    success = True
810 033a1d00 Michael Hanselmann
811 033a1d00 Michael Hanselmann
    for diskie in self._queue:
812 033a1d00 Michael Hanselmann
      success = diskie.Finalize() and success
813 033a1d00 Michael Hanselmann
814 033a1d00 Michael Hanselmann
    return success
815 5d97d6dd Michael Hanselmann
816 5d97d6dd Michael Hanselmann
817 5d97d6dd Michael Hanselmann
class _TransferInstCbBase(ImportExportCbBase):
818 5d97d6dd Michael Hanselmann
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
819 d51ae04c Michael Hanselmann
               dest_node, dest_ip):
820 5d97d6dd Michael Hanselmann
    """Initializes this class.
821 5d97d6dd Michael Hanselmann

822 5d97d6dd Michael Hanselmann
    """
823 5d97d6dd Michael Hanselmann
    ImportExportCbBase.__init__(self)
824 5d97d6dd Michael Hanselmann
825 5d97d6dd Michael Hanselmann
    self.lu = lu
826 5d97d6dd Michael Hanselmann
    self.feedback_fn = feedback_fn
827 5d97d6dd Michael Hanselmann
    self.instance = instance
828 5d97d6dd Michael Hanselmann
    self.timeouts = timeouts
829 5d97d6dd Michael Hanselmann
    self.src_node = src_node
830 5d97d6dd Michael Hanselmann
    self.src_cbs = src_cbs
831 5d97d6dd Michael Hanselmann
    self.dest_node = dest_node
832 5d97d6dd Michael Hanselmann
    self.dest_ip = dest_ip
833 5d97d6dd Michael Hanselmann
834 5d97d6dd Michael Hanselmann
835 5d97d6dd Michael Hanselmann
class _TransferInstSourceCb(_TransferInstCbBase):
836 5d97d6dd Michael Hanselmann
  def ReportConnected(self, ie, dtp):
837 5d97d6dd Michael Hanselmann
    """Called when a connection has been established.
838 5d97d6dd Michael Hanselmann

839 5d97d6dd Michael Hanselmann
    """
840 5d97d6dd Michael Hanselmann
    assert self.src_cbs is None
841 5d97d6dd Michael Hanselmann
    assert dtp.src_export == ie
842 5d97d6dd Michael Hanselmann
    assert dtp.dest_import
843 5d97d6dd Michael Hanselmann
844 5d97d6dd Michael Hanselmann
    self.feedback_fn("%s is sending data on %s" %
845 5d97d6dd Michael Hanselmann
                     (dtp.data.name, ie.node_name))
846 5d97d6dd Michael Hanselmann
847 1a2e7fe9 Michael Hanselmann
  def ReportProgress(self, ie, dtp):
848 1a2e7fe9 Michael Hanselmann
    """Called when new progress information should be reported.
849 1a2e7fe9 Michael Hanselmann

850 1a2e7fe9 Michael Hanselmann
    """
851 1a2e7fe9 Michael Hanselmann
    progress = ie.progress
852 1a2e7fe9 Michael Hanselmann
    if not progress:
853 1a2e7fe9 Michael Hanselmann
      return
854 1a2e7fe9 Michael Hanselmann
855 1a2e7fe9 Michael Hanselmann
    self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
856 1a2e7fe9 Michael Hanselmann
857 5d97d6dd Michael Hanselmann
  def ReportFinished(self, ie, dtp):
858 5d97d6dd Michael Hanselmann
    """Called when a transfer has finished.
859 5d97d6dd Michael Hanselmann

860 5d97d6dd Michael Hanselmann
    """
861 5d97d6dd Michael Hanselmann
    assert self.src_cbs is None
862 5d97d6dd Michael Hanselmann
    assert dtp.src_export == ie
863 5d97d6dd Michael Hanselmann
    assert dtp.dest_import
864 5d97d6dd Michael Hanselmann
865 5d97d6dd Michael Hanselmann
    if ie.success:
866 5d97d6dd Michael Hanselmann
      self.feedback_fn("%s finished sending data" % dtp.data.name)
867 5d97d6dd Michael Hanselmann
    else:
868 5d97d6dd Michael Hanselmann
      self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
869 5d97d6dd Michael Hanselmann
                       (dtp.data.name, ie.final_message, ie.recent_output))
870 5d97d6dd Michael Hanselmann
871 5d97d6dd Michael Hanselmann
    dtp.RecordResult(ie.success)
872 5d97d6dd Michael Hanselmann
873 5d97d6dd Michael Hanselmann
    cb = dtp.data.finished_fn
874 5d97d6dd Michael Hanselmann
    if cb:
875 5d97d6dd Michael Hanselmann
      cb()
876 5d97d6dd Michael Hanselmann
877 5d97d6dd Michael Hanselmann
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
878 5d97d6dd Michael Hanselmann
    # give the daemon a moment to sort things out
879 5d97d6dd Michael Hanselmann
    if dtp.dest_import and not ie.success:
880 5d97d6dd Michael Hanselmann
      dtp.dest_import.Abort()
881 5d97d6dd Michael Hanselmann
882 5d97d6dd Michael Hanselmann
883 5d97d6dd Michael Hanselmann
class _TransferInstDestCb(_TransferInstCbBase):
884 5d97d6dd Michael Hanselmann
  def ReportListening(self, ie, dtp):
885 5d97d6dd Michael Hanselmann
    """Called when daemon started listening.
886 5d97d6dd Michael Hanselmann

887 5d97d6dd Michael Hanselmann
    """
888 5d97d6dd Michael Hanselmann
    assert self.src_cbs
889 5d97d6dd Michael Hanselmann
    assert dtp.src_export is None
890 5d97d6dd Michael Hanselmann
    assert dtp.dest_import
891 d51ae04c Michael Hanselmann
    assert dtp.export_opts
892 5d97d6dd Michael Hanselmann
893 5d97d6dd Michael Hanselmann
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
894 5d97d6dd Michael Hanselmann
895 5d97d6dd Michael Hanselmann
    # Start export on source node
896 d51ae04c Michael Hanselmann
    de = DiskExport(self.lu, self.src_node, dtp.export_opts,
897 a5310c2a Michael Hanselmann
                    self.dest_ip, ie.listen_port,
898 eb630f50 Michael Hanselmann
                    self.instance, dtp.data.src_io, dtp.data.src_ioargs,
899 5d97d6dd Michael Hanselmann
                    self.timeouts, self.src_cbs, private=dtp)
900 5d97d6dd Michael Hanselmann
    ie.loop.Add(de)
901 5d97d6dd Michael Hanselmann
902 5d97d6dd Michael Hanselmann
    dtp.src_export = de
903 5d97d6dd Michael Hanselmann
904 5d97d6dd Michael Hanselmann
  def ReportConnected(self, ie, dtp):
905 5d97d6dd Michael Hanselmann
    """Called when a connection has been established.
906 5d97d6dd Michael Hanselmann

907 5d97d6dd Michael Hanselmann
    """
908 5d97d6dd Michael Hanselmann
    self.feedback_fn("%s is receiving data on %s" %
909 5d97d6dd Michael Hanselmann
                     (dtp.data.name, self.dest_node))
910 5d97d6dd Michael Hanselmann
911 5d97d6dd Michael Hanselmann
  def ReportFinished(self, ie, dtp):
912 5d97d6dd Michael Hanselmann
    """Called when a transfer has finished.
913 5d97d6dd Michael Hanselmann

914 5d97d6dd Michael Hanselmann
    """
915 5d97d6dd Michael Hanselmann
    if ie.success:
916 5d97d6dd Michael Hanselmann
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
917 5d97d6dd Michael Hanselmann
    else:
918 5d97d6dd Michael Hanselmann
      self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
919 5d97d6dd Michael Hanselmann
                       (dtp.data.name, ie.final_message, ie.recent_output))
920 5d97d6dd Michael Hanselmann
921 5d97d6dd Michael Hanselmann
    dtp.RecordResult(ie.success)
922 5d97d6dd Michael Hanselmann
923 5d97d6dd Michael Hanselmann
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
924 5d97d6dd Michael Hanselmann
    # give the daemon a moment to sort things out
925 5d97d6dd Michael Hanselmann
    if dtp.src_export and not ie.success:
926 5d97d6dd Michael Hanselmann
      dtp.src_export.Abort()
927 5d97d6dd Michael Hanselmann
928 5d97d6dd Michael Hanselmann
929 5d97d6dd Michael Hanselmann
class DiskTransfer(object):
930 5d97d6dd Michael Hanselmann
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
931 5d97d6dd Michael Hanselmann
               finished_fn):
932 5d97d6dd Michael Hanselmann
    """Initializes this class.
933 5d97d6dd Michael Hanselmann

934 5d97d6dd Michael Hanselmann
    @type name: string
935 5d97d6dd Michael Hanselmann
    @param name: User-visible name for this transfer (e.g. "disk/0")
936 5d97d6dd Michael Hanselmann
    @param src_io: Source I/O type
937 5d97d6dd Michael Hanselmann
    @param src_ioargs: Source I/O arguments
938 5d97d6dd Michael Hanselmann
    @param dest_io: Destination I/O type
939 5d97d6dd Michael Hanselmann
    @param dest_ioargs: Destination I/O arguments
940 5d97d6dd Michael Hanselmann
    @type finished_fn: callable
941 5d97d6dd Michael Hanselmann
    @param finished_fn: Function called once transfer has finished
942 5d97d6dd Michael Hanselmann

943 5d97d6dd Michael Hanselmann
    """
944 5d97d6dd Michael Hanselmann
    self.name = name
945 5d97d6dd Michael Hanselmann
946 5d97d6dd Michael Hanselmann
    self.src_io = src_io
947 5d97d6dd Michael Hanselmann
    self.src_ioargs = src_ioargs
948 5d97d6dd Michael Hanselmann
949 5d97d6dd Michael Hanselmann
    self.dest_io = dest_io
950 5d97d6dd Michael Hanselmann
    self.dest_ioargs = dest_ioargs
951 5d97d6dd Michael Hanselmann
952 5d97d6dd Michael Hanselmann
    self.finished_fn = finished_fn
953 5d97d6dd Michael Hanselmann
954 5d97d6dd Michael Hanselmann
955 5d97d6dd Michael Hanselmann
class _DiskTransferPrivate(object):
956 d51ae04c Michael Hanselmann
  def __init__(self, data, success, export_opts):
957 5d97d6dd Michael Hanselmann
    """Initializes this class.
958 5d97d6dd Michael Hanselmann

959 5d97d6dd Michael Hanselmann
    @type data: L{DiskTransfer}
960 5d97d6dd Michael Hanselmann
    @type success: bool
961 5d97d6dd Michael Hanselmann

962 5d97d6dd Michael Hanselmann
    """
963 5d97d6dd Michael Hanselmann
    self.data = data
964 d51ae04c Michael Hanselmann
    self.success = success
965 d51ae04c Michael Hanselmann
    self.export_opts = export_opts
966 5d97d6dd Michael Hanselmann
967 5d97d6dd Michael Hanselmann
    self.src_export = None
968 5d97d6dd Michael Hanselmann
    self.dest_import = None
969 5d97d6dd Michael Hanselmann
970 5d97d6dd Michael Hanselmann
  def RecordResult(self, success):
971 5d97d6dd Michael Hanselmann
    """Updates the status.
972 5d97d6dd Michael Hanselmann

973 5d97d6dd Michael Hanselmann
    One failed part will cause the whole transfer to fail.
974 5d97d6dd Michael Hanselmann

975 5d97d6dd Michael Hanselmann
    """
976 5d97d6dd Michael Hanselmann
    self.success = self.success and success
977 5d97d6dd Michael Hanselmann
978 5d97d6dd Michael Hanselmann
979 d51ae04c Michael Hanselmann
def _GetInstDiskMagic(base, instance_name, index):
980 d51ae04c Michael Hanselmann
  """Computes the magic value for a disk export or import.
981 d51ae04c Michael Hanselmann

982 d51ae04c Michael Hanselmann
  @type base: string
983 d51ae04c Michael Hanselmann
  @param base: Random seed value (can be the same for all disks of a transfer)
984 d51ae04c Michael Hanselmann
  @type instance_name: string
985 d51ae04c Michael Hanselmann
  @param instance_name: Name of instance
986 d51ae04c Michael Hanselmann
  @type index: number
987 d51ae04c Michael Hanselmann
  @param index: Disk index
988 d51ae04c Michael Hanselmann

989 d51ae04c Michael Hanselmann
  """
990 d51ae04c Michael Hanselmann
  h = compat.sha1_hash()
991 d51ae04c Michael Hanselmann
  h.update(str(constants.RIE_VERSION))
992 d51ae04c Michael Hanselmann
  h.update(base)
993 d51ae04c Michael Hanselmann
  h.update(instance_name)
994 d51ae04c Michael Hanselmann
  h.update(str(index))
995 d51ae04c Michael Hanselmann
  return h.hexdigest()
996 d51ae04c Michael Hanselmann
997 d51ae04c Michael Hanselmann
998 5d97d6dd Michael Hanselmann
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
999 5d97d6dd Michael Hanselmann
                         instance, all_transfers):
1000 5d97d6dd Michael Hanselmann
  """Transfers an instance's data from one node to another.
1001 5d97d6dd Michael Hanselmann

1002 5d97d6dd Michael Hanselmann
  @param lu: Logical unit instance
1003 5d97d6dd Michael Hanselmann
  @param feedback_fn: Feedback function
1004 5d97d6dd Michael Hanselmann
  @type src_node: string
1005 5d97d6dd Michael Hanselmann
  @param src_node: Source node name
1006 5d97d6dd Michael Hanselmann
  @type dest_node: string
1007 5d97d6dd Michael Hanselmann
  @param dest_node: Destination node name
1008 5d97d6dd Michael Hanselmann
  @type dest_ip: string
1009 5d97d6dd Michael Hanselmann
  @param dest_ip: IP address of destination node
1010 5d97d6dd Michael Hanselmann
  @type instance: L{objects.Instance}
1011 5d97d6dd Michael Hanselmann
  @param instance: Instance object
1012 5d97d6dd Michael Hanselmann
  @type all_transfers: list of L{DiskTransfer} instances
1013 5d97d6dd Michael Hanselmann
  @param all_transfers: List of all disk transfers to be made
1014 5d97d6dd Michael Hanselmann
  @rtype: list
1015 5d97d6dd Michael Hanselmann
  @return: List with a boolean (True=successful, False=failed) for success for
1016 5d97d6dd Michael Hanselmann
           each transfer
1017 5d97d6dd Michael Hanselmann

1018 5d97d6dd Michael Hanselmann
  """
1019 5bb95572 Michael Hanselmann
  # Disable compression for all moves as these are all within the same cluster
1020 5bb95572 Michael Hanselmann
  compress = constants.IEC_NONE
1021 a5310c2a Michael Hanselmann
1022 a5310c2a Michael Hanselmann
  logging.debug("Source node %s, destination node %s, compression '%s'",
1023 a5310c2a Michael Hanselmann
                src_node, dest_node, compress)
1024 a5310c2a Michael Hanselmann
1025 5d97d6dd Michael Hanselmann
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1026 5d97d6dd Michael Hanselmann
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1027 d51ae04c Michael Hanselmann
                                  src_node, None, dest_node, dest_ip)
1028 5d97d6dd Michael Hanselmann
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1029 d51ae04c Michael Hanselmann
                                 src_node, src_cbs, dest_node, dest_ip)
1030 5d97d6dd Michael Hanselmann
1031 5d97d6dd Michael Hanselmann
  all_dtp = []
1032 5d97d6dd Michael Hanselmann
1033 d51ae04c Michael Hanselmann
  base_magic = utils.GenerateSecret(6)
1034 d51ae04c Michael Hanselmann
1035 5d97d6dd Michael Hanselmann
  ieloop = ImportExportLoop(lu)
1036 5d97d6dd Michael Hanselmann
  try:
1037 d51ae04c Michael Hanselmann
    for idx, transfer in enumerate(all_transfers):
1038 5d97d6dd Michael Hanselmann
      if transfer:
1039 5d97d6dd Michael Hanselmann
        feedback_fn("Exporting %s from %s to %s" %
1040 5d97d6dd Michael Hanselmann
                    (transfer.name, src_node, dest_node))
1041 5d97d6dd Michael Hanselmann
1042 d51ae04c Michael Hanselmann
        magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1043 d51ae04c Michael Hanselmann
        opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1044 d51ae04c Michael Hanselmann
                                           compress=compress, magic=magic)
1045 d51ae04c Michael Hanselmann
1046 d51ae04c Michael Hanselmann
        dtp = _DiskTransferPrivate(transfer, True, opts)
1047 5d97d6dd Michael Hanselmann
1048 eb630f50 Michael Hanselmann
        di = DiskImport(lu, dest_node, opts, instance,
1049 5d97d6dd Michael Hanselmann
                        transfer.dest_io, transfer.dest_ioargs,
1050 5d97d6dd Michael Hanselmann
                        timeouts, dest_cbs, private=dtp)
1051 5d97d6dd Michael Hanselmann
        ieloop.Add(di)
1052 5d97d6dd Michael Hanselmann
1053 5d97d6dd Michael Hanselmann
        dtp.dest_import = di
1054 5d97d6dd Michael Hanselmann
      else:
1055 85b3901b Michael Hanselmann
        dtp = _DiskTransferPrivate(None, False, None)
1056 5d97d6dd Michael Hanselmann
1057 5d97d6dd Michael Hanselmann
      all_dtp.append(dtp)
1058 5d97d6dd Michael Hanselmann
1059 5d97d6dd Michael Hanselmann
    ieloop.Run()
1060 5d97d6dd Michael Hanselmann
  finally:
1061 5d97d6dd Michael Hanselmann
    ieloop.FinalizeAll()
1062 5d97d6dd Michael Hanselmann
1063 5d97d6dd Michael Hanselmann
  assert len(all_dtp) == len(all_transfers)
1064 403f5172 Guido Trotter
  assert compat.all((dtp.src_export is None or
1065 5d97d6dd Michael Hanselmann
                      dtp.src_export.success is not None) and
1066 5d97d6dd Michael Hanselmann
                     (dtp.dest_import is None or
1067 5d97d6dd Michael Hanselmann
                      dtp.dest_import.success is not None)
1068 403f5172 Guido Trotter
                     for dtp in all_dtp), \
1069 5d97d6dd Michael Hanselmann
         "Not all imports/exports are finalized"
1070 5d97d6dd Michael Hanselmann
1071 5d97d6dd Michael Hanselmann
  return [bool(dtp.success) for dtp in all_dtp]
1072 387794f8 Michael Hanselmann
1073 387794f8 Michael Hanselmann
1074 4a96f1d1 Michael Hanselmann
class _RemoteExportCb(ImportExportCbBase):
1075 4a96f1d1 Michael Hanselmann
  def __init__(self, feedback_fn, disk_count):
1076 4a96f1d1 Michael Hanselmann
    """Initializes this class.
1077 4a96f1d1 Michael Hanselmann

1078 4a96f1d1 Michael Hanselmann
    """
1079 4a96f1d1 Michael Hanselmann
    ImportExportCbBase.__init__(self)
1080 4a96f1d1 Michael Hanselmann
    self._feedback_fn = feedback_fn
1081 4a96f1d1 Michael Hanselmann
    self._dresults = [None] * disk_count
1082 4a96f1d1 Michael Hanselmann
1083 4a96f1d1 Michael Hanselmann
  @property
1084 4a96f1d1 Michael Hanselmann
  def disk_results(self):
1085 4a96f1d1 Michael Hanselmann
    """Returns per-disk results.
1086 4a96f1d1 Michael Hanselmann

1087 4a96f1d1 Michael Hanselmann
    """
1088 4a96f1d1 Michael Hanselmann
    return self._dresults
1089 4a96f1d1 Michael Hanselmann
1090 4a96f1d1 Michael Hanselmann
  def ReportConnected(self, ie, private):
1091 4a96f1d1 Michael Hanselmann
    """Called when a connection has been established.
1092 4a96f1d1 Michael Hanselmann

1093 4a96f1d1 Michael Hanselmann
    """
1094 4a96f1d1 Michael Hanselmann
    (idx, _) = private
1095 4a96f1d1 Michael Hanselmann
1096 4a96f1d1 Michael Hanselmann
    self._feedback_fn("Disk %s is now sending data" % idx)
1097 4a96f1d1 Michael Hanselmann
1098 1a2e7fe9 Michael Hanselmann
  def ReportProgress(self, ie, private):
1099 1a2e7fe9 Michael Hanselmann
    """Called when new progress information should be reported.
1100 1a2e7fe9 Michael Hanselmann

1101 1a2e7fe9 Michael Hanselmann
    """
1102 1a2e7fe9 Michael Hanselmann
    (idx, _) = private
1103 1a2e7fe9 Michael Hanselmann
1104 1a2e7fe9 Michael Hanselmann
    progress = ie.progress
1105 1a2e7fe9 Michael Hanselmann
    if not progress:
1106 1a2e7fe9 Michael Hanselmann
      return
1107 1a2e7fe9 Michael Hanselmann
1108 1a2e7fe9 Michael Hanselmann
    self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1109 1a2e7fe9 Michael Hanselmann
1110 4a96f1d1 Michael Hanselmann
  def ReportFinished(self, ie, private):
1111 4a96f1d1 Michael Hanselmann
    """Called when a transfer has finished.
1112 4a96f1d1 Michael Hanselmann

1113 4a96f1d1 Michael Hanselmann
    """
1114 4a96f1d1 Michael Hanselmann
    (idx, finished_fn) = private
1115 4a96f1d1 Michael Hanselmann
1116 4a96f1d1 Michael Hanselmann
    if ie.success:
1117 4a96f1d1 Michael Hanselmann
      self._feedback_fn("Disk %s finished sending data" % idx)
1118 4a96f1d1 Michael Hanselmann
    else:
1119 4a96f1d1 Michael Hanselmann
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" %
1120 4a96f1d1 Michael Hanselmann
                        (idx, ie.final_message, ie.recent_output))
1121 4a96f1d1 Michael Hanselmann
1122 4a96f1d1 Michael Hanselmann
    self._dresults[idx] = bool(ie.success)
1123 4a96f1d1 Michael Hanselmann
1124 4a96f1d1 Michael Hanselmann
    if finished_fn:
1125 4a96f1d1 Michael Hanselmann
      finished_fn()
1126 4a96f1d1 Michael Hanselmann
1127 4a96f1d1 Michael Hanselmann
1128 387794f8 Michael Hanselmann
class ExportInstanceHelper:
1129 387794f8 Michael Hanselmann
  def __init__(self, lu, feedback_fn, instance):
1130 387794f8 Michael Hanselmann
    """Initializes this class.
1131 387794f8 Michael Hanselmann

1132 387794f8 Michael Hanselmann
    @param lu: Logical unit instance
1133 387794f8 Michael Hanselmann
    @param feedback_fn: Feedback function
1134 387794f8 Michael Hanselmann
    @type instance: L{objects.Instance}
1135 387794f8 Michael Hanselmann
    @param instance: Instance object
1136 387794f8 Michael Hanselmann

1137 387794f8 Michael Hanselmann
    """
1138 387794f8 Michael Hanselmann
    self._lu = lu
1139 387794f8 Michael Hanselmann
    self._feedback_fn = feedback_fn
1140 387794f8 Michael Hanselmann
    self._instance = instance
1141 387794f8 Michael Hanselmann
1142 387794f8 Michael Hanselmann
    self._snap_disks = []
1143 387794f8 Michael Hanselmann
    self._removed_snaps = [False] * len(instance.disks)
1144 387794f8 Michael Hanselmann
1145 387794f8 Michael Hanselmann
  def CreateSnapshots(self):
1146 387794f8 Michael Hanselmann
    """Creates an LVM snapshot for every disk of the instance.
1147 387794f8 Michael Hanselmann

1148 387794f8 Michael Hanselmann
    """
1149 387794f8 Michael Hanselmann
    assert not self._snap_disks
1150 387794f8 Michael Hanselmann
1151 387794f8 Michael Hanselmann
    instance = self._instance
1152 387794f8 Michael Hanselmann
    src_node = instance.primary_node
1153 387794f8 Michael Hanselmann
1154 387794f8 Michael Hanselmann
    vgname = self._lu.cfg.GetVGName()
1155 387794f8 Michael Hanselmann
1156 387794f8 Michael Hanselmann
    for idx, disk in enumerate(instance.disks):
1157 387794f8 Michael Hanselmann
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1158 387794f8 Michael Hanselmann
                        (idx, src_node))
1159 387794f8 Michael Hanselmann
1160 387794f8 Michael Hanselmann
      # result.payload will be a snapshot of an lvm leaf of the one we
1161 387794f8 Michael Hanselmann
      # passed
1162 387794f8 Michael Hanselmann
      result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1163 387794f8 Michael Hanselmann
      msg = result.fail_msg
1164 387794f8 Michael Hanselmann
      if msg:
1165 387794f8 Michael Hanselmann
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1166 387794f8 Michael Hanselmann
                            idx, src_node, msg)
1167 387794f8 Michael Hanselmann
        new_dev = False
1168 387794f8 Michael Hanselmann
      else:
1169 387794f8 Michael Hanselmann
        disk_id = (vgname, result.payload)
1170 387794f8 Michael Hanselmann
        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1171 387794f8 Michael Hanselmann
                               logical_id=disk_id, physical_id=disk_id,
1172 387794f8 Michael Hanselmann
                               iv_name=disk.iv_name)
1173 387794f8 Michael Hanselmann
1174 387794f8 Michael Hanselmann
      self._snap_disks.append(new_dev)
1175 387794f8 Michael Hanselmann
1176 387794f8 Michael Hanselmann
    assert len(self._snap_disks) == len(instance.disks)
1177 387794f8 Michael Hanselmann
    assert len(self._removed_snaps) == len(instance.disks)
1178 387794f8 Michael Hanselmann
1179 387794f8 Michael Hanselmann
  def _RemoveSnapshot(self, disk_index):
1180 387794f8 Michael Hanselmann
    """Removes an LVM snapshot.
1181 387794f8 Michael Hanselmann

1182 387794f8 Michael Hanselmann
    @type disk_index: number
1183 387794f8 Michael Hanselmann
    @param disk_index: Index of the snapshot to be removed
1184 387794f8 Michael Hanselmann

1185 387794f8 Michael Hanselmann
    """
1186 387794f8 Michael Hanselmann
    disk = self._snap_disks[disk_index]
1187 387794f8 Michael Hanselmann
    if disk and not self._removed_snaps[disk_index]:
1188 387794f8 Michael Hanselmann
      src_node = self._instance.primary_node
1189 387794f8 Michael Hanselmann
1190 387794f8 Michael Hanselmann
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1191 387794f8 Michael Hanselmann
                        (disk_index, src_node))
1192 387794f8 Michael Hanselmann
1193 387794f8 Michael Hanselmann
      result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1194 387794f8 Michael Hanselmann
      if result.fail_msg:
1195 387794f8 Michael Hanselmann
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1196 387794f8 Michael Hanselmann
                            " %s: %s", disk_index, src_node, result.fail_msg)
1197 387794f8 Michael Hanselmann
      else:
1198 387794f8 Michael Hanselmann
        self._removed_snaps[disk_index] = True
1199 387794f8 Michael Hanselmann
1200 387794f8 Michael Hanselmann
  def LocalExport(self, dest_node):
1201 387794f8 Michael Hanselmann
    """Intra-cluster instance export.
1202 387794f8 Michael Hanselmann

1203 387794f8 Michael Hanselmann
    @type dest_node: L{objects.Node}
1204 387794f8 Michael Hanselmann
    @param dest_node: Destination node
1205 387794f8 Michael Hanselmann

1206 387794f8 Michael Hanselmann
    """
1207 387794f8 Michael Hanselmann
    instance = self._instance
1208 387794f8 Michael Hanselmann
    src_node = instance.primary_node
1209 387794f8 Michael Hanselmann
1210 387794f8 Michael Hanselmann
    assert len(self._snap_disks) == len(instance.disks)
1211 387794f8 Michael Hanselmann
1212 387794f8 Michael Hanselmann
    transfers = []
1213 387794f8 Michael Hanselmann
1214 387794f8 Michael Hanselmann
    for idx, dev in enumerate(self._snap_disks):
1215 387794f8 Michael Hanselmann
      if not dev:
1216 387794f8 Michael Hanselmann
        transfers.append(None)
1217 387794f8 Michael Hanselmann
        continue
1218 387794f8 Michael Hanselmann
1219 387794f8 Michael Hanselmann
      path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1220 387794f8 Michael Hanselmann
                            dev.physical_id[1])
1221 387794f8 Michael Hanselmann
1222 387794f8 Michael Hanselmann
      finished_fn = compat.partial(self._TransferFinished, idx)
1223 387794f8 Michael Hanselmann
1224 387794f8 Michael Hanselmann
      # FIXME: pass debug option from opcode to backend
1225 387794f8 Michael Hanselmann
      dt = DiskTransfer("snapshot/%s" % idx,
1226 387794f8 Michael Hanselmann
                        constants.IEIO_SCRIPT, (dev, idx),
1227 387794f8 Michael Hanselmann
                        constants.IEIO_FILE, (path, ),
1228 387794f8 Michael Hanselmann
                        finished_fn)
1229 387794f8 Michael Hanselmann
      transfers.append(dt)
1230 387794f8 Michael Hanselmann
1231 387794f8 Michael Hanselmann
    # Actually export data
1232 387794f8 Michael Hanselmann
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1233 387794f8 Michael Hanselmann
                                    src_node, dest_node.name,
1234 387794f8 Michael Hanselmann
                                    dest_node.secondary_ip,
1235 387794f8 Michael Hanselmann
                                    instance, transfers)
1236 387794f8 Michael Hanselmann
1237 387794f8 Michael Hanselmann
    assert len(dresults) == len(instance.disks)
1238 387794f8 Michael Hanselmann
1239 387794f8 Michael Hanselmann
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1240 387794f8 Michael Hanselmann
    result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1241 387794f8 Michael Hanselmann
                                               self._snap_disks)
1242 387794f8 Michael Hanselmann
    msg = result.fail_msg
1243 387794f8 Michael Hanselmann
    fin_resu = not msg
1244 387794f8 Michael Hanselmann
    if msg:
1245 387794f8 Michael Hanselmann
      self._lu.LogWarning("Could not finalize export for instance %s"
1246 387794f8 Michael Hanselmann
                          " on node %s: %s", instance.name, dest_node.name, msg)
1247 387794f8 Michael Hanselmann
1248 387794f8 Michael Hanselmann
    return (fin_resu, dresults)
1249 387794f8 Michael Hanselmann
1250 d51ae04c Michael Hanselmann
  def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1251 4a96f1d1 Michael Hanselmann
    """Inter-cluster instance export.
1252 4a96f1d1 Michael Hanselmann

1253 4a96f1d1 Michael Hanselmann
    @type disk_info: list
1254 4a96f1d1 Michael Hanselmann
    @param disk_info: Per-disk destination information
1255 d51ae04c Michael Hanselmann
    @type key_name: string
1256 d51ae04c Michael Hanselmann
    @param key_name: Name of X509 key to use
1257 d51ae04c Michael Hanselmann
    @type dest_ca_pem: string
1258 d51ae04c Michael Hanselmann
    @param dest_ca_pem: Destination X509 CA in PEM format
1259 4a96f1d1 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
1260 4a96f1d1 Michael Hanselmann
    @param timeouts: Timeouts for this import
1261 4a96f1d1 Michael Hanselmann

1262 4a96f1d1 Michael Hanselmann
    """
1263 4a96f1d1 Michael Hanselmann
    instance = self._instance
1264 4a96f1d1 Michael Hanselmann
1265 4a96f1d1 Michael Hanselmann
    assert len(disk_info) == len(instance.disks)
1266 4a96f1d1 Michael Hanselmann
1267 4a96f1d1 Michael Hanselmann
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1268 4a96f1d1 Michael Hanselmann
1269 4a96f1d1 Michael Hanselmann
    ieloop = ImportExportLoop(self._lu)
1270 4a96f1d1 Michael Hanselmann
    try:
1271 d51ae04c Michael Hanselmann
      for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1272 d51ae04c Michael Hanselmann
                                                           disk_info)):
1273 d51ae04c Michael Hanselmann
        opts = objects.ImportExportOptions(key_name=key_name,
1274 d51ae04c Michael Hanselmann
                                           ca_pem=dest_ca_pem,
1275 d51ae04c Michael Hanselmann
                                           magic=magic)
1276 d51ae04c Michael Hanselmann
1277 4a96f1d1 Michael Hanselmann
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1278 4a96f1d1 Michael Hanselmann
        finished_fn = compat.partial(self._TransferFinished, idx)
1279 4a96f1d1 Michael Hanselmann
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1280 eb630f50 Michael Hanselmann
                              opts, host, port, instance,
1281 4a96f1d1 Michael Hanselmann
                              constants.IEIO_SCRIPT, (dev, idx),
1282 4a96f1d1 Michael Hanselmann
                              timeouts, cbs, private=(idx, finished_fn)))
1283 4a96f1d1 Michael Hanselmann
1284 4a96f1d1 Michael Hanselmann
      ieloop.Run()
1285 4a96f1d1 Michael Hanselmann
    finally:
1286 4a96f1d1 Michael Hanselmann
      ieloop.FinalizeAll()
1287 4a96f1d1 Michael Hanselmann
1288 4a96f1d1 Michael Hanselmann
    return (True, cbs.disk_results)
1289 4a96f1d1 Michael Hanselmann
1290 387794f8 Michael Hanselmann
  def _TransferFinished(self, idx):
1291 387794f8 Michael Hanselmann
    """Called once a transfer has finished.
1292 387794f8 Michael Hanselmann

1293 387794f8 Michael Hanselmann
    @type idx: number
1294 387794f8 Michael Hanselmann
    @param idx: Disk index
1295 387794f8 Michael Hanselmann

1296 387794f8 Michael Hanselmann
    """
1297 387794f8 Michael Hanselmann
    logging.debug("Transfer %s finished", idx)
1298 387794f8 Michael Hanselmann
    self._RemoveSnapshot(idx)
1299 387794f8 Michael Hanselmann
1300 387794f8 Michael Hanselmann
  def Cleanup(self):
1301 387794f8 Michael Hanselmann
    """Remove all snapshots.
1302 387794f8 Michael Hanselmann

1303 387794f8 Michael Hanselmann
    """
1304 387794f8 Michael Hanselmann
    assert len(self._removed_snaps) == len(self._instance.disks)
1305 387794f8 Michael Hanselmann
    for idx in range(len(self._instance.disks)):
1306 387794f8 Michael Hanselmann
      self._RemoveSnapshot(idx)
1307 1410fa8d Michael Hanselmann
1308 1410fa8d Michael Hanselmann
1309 9bf56d77 Michael Hanselmann
class _RemoteImportCb(ImportExportCbBase):
1310 9bf56d77 Michael Hanselmann
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1311 9bf56d77 Michael Hanselmann
               external_address):
1312 9bf56d77 Michael Hanselmann
    """Initializes this class.
1313 9bf56d77 Michael Hanselmann

1314 9bf56d77 Michael Hanselmann
    @type cds: string
1315 9bf56d77 Michael Hanselmann
    @param cds: Cluster domain secret
1316 9bf56d77 Michael Hanselmann
    @type x509_cert_pem: string
1317 9bf56d77 Michael Hanselmann
    @param x509_cert_pem: CA used for signing import key
1318 9bf56d77 Michael Hanselmann
    @type disk_count: number
1319 9bf56d77 Michael Hanselmann
    @param disk_count: Number of disks
1320 9bf56d77 Michael Hanselmann
    @type external_address: string
1321 9bf56d77 Michael Hanselmann
    @param external_address: External address of destination node
1322 9bf56d77 Michael Hanselmann

1323 9bf56d77 Michael Hanselmann
    """
1324 9bf56d77 Michael Hanselmann
    ImportExportCbBase.__init__(self)
1325 9bf56d77 Michael Hanselmann
    self._feedback_fn = feedback_fn
1326 9bf56d77 Michael Hanselmann
    self._cds = cds
1327 9bf56d77 Michael Hanselmann
    self._x509_cert_pem = x509_cert_pem
1328 9bf56d77 Michael Hanselmann
    self._disk_count = disk_count
1329 9bf56d77 Michael Hanselmann
    self._external_address = external_address
1330 9bf56d77 Michael Hanselmann
1331 9bf56d77 Michael Hanselmann
    self._dresults = [None] * disk_count
1332 9bf56d77 Michael Hanselmann
    self._daemon_port = [None] * disk_count
1333 9bf56d77 Michael Hanselmann
1334 9bf56d77 Michael Hanselmann
    self._salt = utils.GenerateSecret(8)
1335 9bf56d77 Michael Hanselmann
1336 9bf56d77 Michael Hanselmann
  @property
1337 9bf56d77 Michael Hanselmann
  def disk_results(self):
1338 9bf56d77 Michael Hanselmann
    """Returns per-disk results.
1339 9bf56d77 Michael Hanselmann

1340 9bf56d77 Michael Hanselmann
    """
1341 9bf56d77 Michael Hanselmann
    return self._dresults
1342 9bf56d77 Michael Hanselmann
1343 9bf56d77 Michael Hanselmann
  def _CheckAllListening(self):
1344 9bf56d77 Michael Hanselmann
    """Checks whether all daemons are listening.
1345 9bf56d77 Michael Hanselmann

1346 9bf56d77 Michael Hanselmann
    If all daemons are listening, the information is sent to the client.
1347 9bf56d77 Michael Hanselmann

1348 9bf56d77 Michael Hanselmann
    """
1349 9bf56d77 Michael Hanselmann
    if not compat.all(dp is not None for dp in self._daemon_port):
1350 9bf56d77 Michael Hanselmann
      return
1351 9bf56d77 Michael Hanselmann
1352 9bf56d77 Michael Hanselmann
    host = self._external_address
1353 9bf56d77 Michael Hanselmann
1354 9bf56d77 Michael Hanselmann
    disks = []
1355 d51ae04c Michael Hanselmann
    for idx, (port, magic) in enumerate(self._daemon_port):
1356 9bf56d77 Michael Hanselmann
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1357 d51ae04c Michael Hanselmann
                                               idx, host, port, magic))
1358 9bf56d77 Michael Hanselmann
1359 9bf56d77 Michael Hanselmann
    assert len(disks) == self._disk_count
1360 9bf56d77 Michael Hanselmann
1361 9bf56d77 Michael Hanselmann
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1362 9bf56d77 Michael Hanselmann
      "disks": disks,
1363 9bf56d77 Michael Hanselmann
      "x509_ca": self._x509_cert_pem,
1364 9bf56d77 Michael Hanselmann
      })
1365 9bf56d77 Michael Hanselmann
1366 9bf56d77 Michael Hanselmann
  def ReportListening(self, ie, private):
1367 9bf56d77 Michael Hanselmann
    """Called when daemon started listening.
1368 9bf56d77 Michael Hanselmann

1369 9bf56d77 Michael Hanselmann
    """
1370 9bf56d77 Michael Hanselmann
    (idx, ) = private
1371 9bf56d77 Michael Hanselmann
1372 9bf56d77 Michael Hanselmann
    self._feedback_fn("Disk %s is now listening" % idx)
1373 9bf56d77 Michael Hanselmann
1374 9bf56d77 Michael Hanselmann
    assert self._daemon_port[idx] is None
1375 9bf56d77 Michael Hanselmann
1376 d51ae04c Michael Hanselmann
    self._daemon_port[idx] = (ie.listen_port, ie.magic)
1377 9bf56d77 Michael Hanselmann
1378 9bf56d77 Michael Hanselmann
    self._CheckAllListening()
1379 9bf56d77 Michael Hanselmann
1380 9bf56d77 Michael Hanselmann
  def ReportConnected(self, ie, private):
1381 9bf56d77 Michael Hanselmann
    """Called when a connection has been established.
1382 9bf56d77 Michael Hanselmann

1383 9bf56d77 Michael Hanselmann
    """
1384 9bf56d77 Michael Hanselmann
    (idx, ) = private
1385 9bf56d77 Michael Hanselmann
1386 9bf56d77 Michael Hanselmann
    self._feedback_fn("Disk %s is now receiving data" % idx)
1387 9bf56d77 Michael Hanselmann
1388 9bf56d77 Michael Hanselmann
  def ReportFinished(self, ie, private):
1389 9bf56d77 Michael Hanselmann
    """Called when a transfer has finished.
1390 9bf56d77 Michael Hanselmann

1391 9bf56d77 Michael Hanselmann
    """
1392 9bf56d77 Michael Hanselmann
    (idx, ) = private
1393 9bf56d77 Michael Hanselmann
1394 9bf56d77 Michael Hanselmann
    # Daemon is certainly no longer listening
1395 9bf56d77 Michael Hanselmann
    self._daemon_port[idx] = None
1396 9bf56d77 Michael Hanselmann
1397 9bf56d77 Michael Hanselmann
    if ie.success:
1398 9bf56d77 Michael Hanselmann
      self._feedback_fn("Disk %s finished receiving data" % idx)
1399 9bf56d77 Michael Hanselmann
    else:
1400 9bf56d77 Michael Hanselmann
      self._feedback_fn(("Disk %s failed to receive data: %s"
1401 9bf56d77 Michael Hanselmann
                         " (recent output: %r)") %
1402 9bf56d77 Michael Hanselmann
                        (idx, ie.final_message, ie.recent_output))
1403 9bf56d77 Michael Hanselmann
1404 9bf56d77 Michael Hanselmann
    self._dresults[idx] = bool(ie.success)
1405 9bf56d77 Michael Hanselmann
1406 9bf56d77 Michael Hanselmann
1407 9bf56d77 Michael Hanselmann
def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts):
1408 9bf56d77 Michael Hanselmann
  """Imports an instance from another cluster.
1409 9bf56d77 Michael Hanselmann

1410 9bf56d77 Michael Hanselmann
  @param lu: Logical unit instance
1411 9bf56d77 Michael Hanselmann
  @param feedback_fn: Feedback function
1412 9bf56d77 Michael Hanselmann
  @type instance: L{objects.Instance}
1413 9bf56d77 Michael Hanselmann
  @param instance: Instance object
1414 9bf56d77 Michael Hanselmann
  @type source_x509_ca: OpenSSL.crypto.X509
1415 9bf56d77 Michael Hanselmann
  @param source_x509_ca: Import source's X509 CA
1416 9bf56d77 Michael Hanselmann
  @type cds: string
1417 9bf56d77 Michael Hanselmann
  @param cds: Cluster domain secret
1418 9bf56d77 Michael Hanselmann
  @type timeouts: L{ImportExportTimeouts}
1419 9bf56d77 Michael Hanselmann
  @param timeouts: Timeouts for this import
1420 9bf56d77 Michael Hanselmann

1421 9bf56d77 Michael Hanselmann
  """
1422 9bf56d77 Michael Hanselmann
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1423 9bf56d77 Michael Hanselmann
                                                  source_x509_ca)
1424 9bf56d77 Michael Hanselmann
1425 d51ae04c Michael Hanselmann
  magic_base = utils.GenerateSecret(6)
1426 d51ae04c Michael Hanselmann
1427 9bf56d77 Michael Hanselmann
  # Create crypto key
1428 9bf56d77 Michael Hanselmann
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1429 9bf56d77 Michael Hanselmann
                                        constants.RIE_CERT_VALIDITY)
1430 9bf56d77 Michael Hanselmann
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1431 9bf56d77 Michael Hanselmann
1432 9bf56d77 Michael Hanselmann
  (x509_key_name, x509_cert_pem) = result.payload
1433 9bf56d77 Michael Hanselmann
  try:
1434 9bf56d77 Michael Hanselmann
    # Load certificate
1435 9bf56d77 Michael Hanselmann
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1436 9bf56d77 Michael Hanselmann
                                                x509_cert_pem)
1437 9bf56d77 Michael Hanselmann
1438 9bf56d77 Michael Hanselmann
    # Sign certificate
1439 9bf56d77 Michael Hanselmann
    signed_x509_cert_pem = \
1440 9bf56d77 Michael Hanselmann
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1441 9bf56d77 Michael Hanselmann
1442 9bf56d77 Michael Hanselmann
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1443 9bf56d77 Michael Hanselmann
                          len(instance.disks), instance.primary_node)
1444 9bf56d77 Michael Hanselmann
1445 9bf56d77 Michael Hanselmann
    ieloop = ImportExportLoop(lu)
1446 9bf56d77 Michael Hanselmann
    try:
1447 9bf56d77 Michael Hanselmann
      for idx, dev in enumerate(instance.disks):
1448 d51ae04c Michael Hanselmann
        magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1449 d51ae04c Michael Hanselmann
1450 d51ae04c Michael Hanselmann
        # Import daemon options
1451 d51ae04c Michael Hanselmann
        opts = objects.ImportExportOptions(key_name=x509_key_name,
1452 d51ae04c Michael Hanselmann
                                           ca_pem=source_ca_pem,
1453 d51ae04c Michael Hanselmann
                                           magic=magic)
1454 d51ae04c Michael Hanselmann
1455 eb630f50 Michael Hanselmann
        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1456 9bf56d77 Michael Hanselmann
                              constants.IEIO_SCRIPT, (dev, idx),
1457 9bf56d77 Michael Hanselmann
                              timeouts, cbs, private=(idx, )))
1458 9bf56d77 Michael Hanselmann
1459 9bf56d77 Michael Hanselmann
      ieloop.Run()
1460 9bf56d77 Michael Hanselmann
    finally:
1461 9bf56d77 Michael Hanselmann
      ieloop.FinalizeAll()
1462 9bf56d77 Michael Hanselmann
  finally:
1463 9bf56d77 Michael Hanselmann
    # Remove crypto key and certificate
1464 9bf56d77 Michael Hanselmann
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1465 9bf56d77 Michael Hanselmann
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1466 9bf56d77 Michael Hanselmann
1467 9bf56d77 Michael Hanselmann
  return cbs.disk_results
1468 9bf56d77 Michael Hanselmann
1469 9bf56d77 Michael Hanselmann
1470 1410fa8d Michael Hanselmann
def _GetImportExportHandshakeMessage(version):
1471 1410fa8d Michael Hanselmann
  """Returns the handshake message for a RIE protocol version.
1472 1410fa8d Michael Hanselmann

1473 1410fa8d Michael Hanselmann
  @type version: number
1474 1410fa8d Michael Hanselmann

1475 1410fa8d Michael Hanselmann
  """
1476 1410fa8d Michael Hanselmann
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1477 1410fa8d Michael Hanselmann
1478 1410fa8d Michael Hanselmann
1479 1410fa8d Michael Hanselmann
def ComputeRemoteExportHandshake(cds):
1480 1410fa8d Michael Hanselmann
  """Computes the remote import/export handshake.
1481 1410fa8d Michael Hanselmann

1482 1410fa8d Michael Hanselmann
  @type cds: string
1483 1410fa8d Michael Hanselmann
  @param cds: Cluster domain secret
1484 1410fa8d Michael Hanselmann

1485 1410fa8d Michael Hanselmann
  """
1486 1410fa8d Michael Hanselmann
  salt = utils.GenerateSecret(8)
1487 1410fa8d Michael Hanselmann
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1488 1410fa8d Michael Hanselmann
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1489 1410fa8d Michael Hanselmann
1490 1410fa8d Michael Hanselmann
1491 1410fa8d Michael Hanselmann
def CheckRemoteExportHandshake(cds, handshake):
1492 1410fa8d Michael Hanselmann
  """Checks the handshake of a remote import/export.
1493 1410fa8d Michael Hanselmann

1494 1410fa8d Michael Hanselmann
  @type cds: string
1495 1410fa8d Michael Hanselmann
  @param cds: Cluster domain secret
1496 1410fa8d Michael Hanselmann
  @type handshake: sequence
1497 1410fa8d Michael Hanselmann
  @param handshake: Handshake sent by remote peer
1498 1410fa8d Michael Hanselmann

1499 1410fa8d Michael Hanselmann
  """
1500 1410fa8d Michael Hanselmann
  try:
1501 1410fa8d Michael Hanselmann
    (version, hmac_digest, hmac_salt) = handshake
1502 1410fa8d Michael Hanselmann
  except (TypeError, ValueError), err:
1503 1410fa8d Michael Hanselmann
    return "Invalid data: %s" % err
1504 1410fa8d Michael Hanselmann
1505 1410fa8d Michael Hanselmann
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1506 1410fa8d Michael Hanselmann
                              hmac_digest, salt=hmac_salt):
1507 1410fa8d Michael Hanselmann
    return "Hash didn't match, clusters don't share the same domain secret"
1508 1410fa8d Michael Hanselmann
1509 1410fa8d Michael Hanselmann
  if version != constants.RIE_VERSION:
1510 1410fa8d Michael Hanselmann
    return ("Clusters don't have the same remote import/export protocol"
1511 1410fa8d Michael Hanselmann
            " (local=%s, remote=%s)" %
1512 1410fa8d Michael Hanselmann
            (constants.RIE_VERSION, version))
1513 1410fa8d Michael Hanselmann
1514 1410fa8d Michael Hanselmann
  return None
1515 4a96f1d1 Michael Hanselmann
1516 4a96f1d1 Michael Hanselmann
1517 d51ae04c Michael Hanselmann
def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1518 4a96f1d1 Michael Hanselmann
  """Returns the hashed text for import/export disk information.
1519 4a96f1d1 Michael Hanselmann

1520 4a96f1d1 Michael Hanselmann
  @type disk_index: number
1521 4a96f1d1 Michael Hanselmann
  @param disk_index: Index of disk (included in hash)
1522 4a96f1d1 Michael Hanselmann
  @type host: string
1523 4a96f1d1 Michael Hanselmann
  @param host: Hostname
1524 4a96f1d1 Michael Hanselmann
  @type port: number
1525 4a96f1d1 Michael Hanselmann
  @param port: Daemon port
1526 d51ae04c Michael Hanselmann
  @type magic: string
1527 d51ae04c Michael Hanselmann
  @param magic: Magic value
1528 4a96f1d1 Michael Hanselmann

1529 4a96f1d1 Michael Hanselmann
  """
1530 d51ae04c Michael Hanselmann
  return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1531 4a96f1d1 Michael Hanselmann
1532 4a96f1d1 Michael Hanselmann
1533 4a96f1d1 Michael Hanselmann
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1534 4a96f1d1 Michael Hanselmann
  """Verifies received disk information for an export.
1535 4a96f1d1 Michael Hanselmann

1536 4a96f1d1 Michael Hanselmann
  @type cds: string
1537 4a96f1d1 Michael Hanselmann
  @param cds: Cluster domain secret
1538 4a96f1d1 Michael Hanselmann
  @type disk_index: number
1539 4a96f1d1 Michael Hanselmann
  @param disk_index: Index of disk (included in hash)
1540 4a96f1d1 Michael Hanselmann
  @type disk_info: sequence
1541 4a96f1d1 Michael Hanselmann
  @param disk_info: Disk information sent by remote peer
1542 4a96f1d1 Michael Hanselmann

1543 4a96f1d1 Michael Hanselmann
  """
1544 4a96f1d1 Michael Hanselmann
  try:
1545 d51ae04c Michael Hanselmann
    (host, port, magic, hmac_digest, hmac_salt) = disk_info
1546 4a96f1d1 Michael Hanselmann
  except (TypeError, ValueError), err:
1547 4a96f1d1 Michael Hanselmann
    raise errors.GenericError("Invalid data: %s" % err)
1548 4a96f1d1 Michael Hanselmann
1549 d51ae04c Michael Hanselmann
  if not (host and port and magic):
1550 d51ae04c Michael Hanselmann
    raise errors.GenericError("Missing destination host, port or magic")
1551 4a96f1d1 Michael Hanselmann
1552 d51ae04c Michael Hanselmann
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1553 4a96f1d1 Michael Hanselmann
1554 4a96f1d1 Michael Hanselmann
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1555 4a96f1d1 Michael Hanselmann
    raise errors.GenericError("HMAC is wrong")
1556 4a96f1d1 Michael Hanselmann
1557 b705c7a6 Manuel Franceschini
  return (netutils.Hostname.GetNormalizedName(host),
1558 d51ae04c Michael Hanselmann
          utils.ValidateServiceName(port),
1559 d51ae04c Michael Hanselmann
          magic)
1560 4a96f1d1 Michael Hanselmann
1561 4a96f1d1 Michael Hanselmann
1562 d51ae04c Michael Hanselmann
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1563 4a96f1d1 Michael Hanselmann
  """Computes the signed disk information for a remote import.
1564 4a96f1d1 Michael Hanselmann

1565 4a96f1d1 Michael Hanselmann
  @type cds: string
1566 4a96f1d1 Michael Hanselmann
  @param cds: Cluster domain secret
1567 4a96f1d1 Michael Hanselmann
  @type salt: string
1568 4a96f1d1 Michael Hanselmann
  @param salt: HMAC salt
1569 4a96f1d1 Michael Hanselmann
  @type disk_index: number
1570 4a96f1d1 Michael Hanselmann
  @param disk_index: Index of disk (included in hash)
1571 4a96f1d1 Michael Hanselmann
  @type host: string
1572 4a96f1d1 Michael Hanselmann
  @param host: Hostname
1573 4a96f1d1 Michael Hanselmann
  @type port: number
1574 4a96f1d1 Michael Hanselmann
  @param port: Daemon port
1575 d51ae04c Michael Hanselmann
  @type magic: string
1576 d51ae04c Michael Hanselmann
  @param magic: Magic value
1577 4a96f1d1 Michael Hanselmann

1578 4a96f1d1 Michael Hanselmann
  """
1579 d51ae04c Michael Hanselmann
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1580 4a96f1d1 Michael Hanselmann
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1581 d51ae04c Michael Hanselmann
  return (host, port, magic, hmac_digest, salt)