Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ cf00dba0

History | View | Annotate | Download (44.8 kB)

1 033a1d00 Michael Hanselmann
#
2 033a1d00 Michael Hanselmann
#
3 033a1d00 Michael Hanselmann
4 c9300bb3 Iustin Pop
# Copyright (C) 2010, 2011 Google Inc.
5 033a1d00 Michael Hanselmann
#
6 033a1d00 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 033a1d00 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 033a1d00 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 033a1d00 Michael Hanselmann
# (at your option) any later version.
10 033a1d00 Michael Hanselmann
#
11 033a1d00 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 033a1d00 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 033a1d00 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 033a1d00 Michael Hanselmann
# General Public License for more details.
15 033a1d00 Michael Hanselmann
#
16 033a1d00 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 033a1d00 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 033a1d00 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 033a1d00 Michael Hanselmann
# 02110-1301, USA.
20 033a1d00 Michael Hanselmann
21 033a1d00 Michael Hanselmann
22 033a1d00 Michael Hanselmann
"""Instance-related functions and classes for masterd.
23 033a1d00 Michael Hanselmann

24 033a1d00 Michael Hanselmann
"""
25 033a1d00 Michael Hanselmann
26 033a1d00 Michael Hanselmann
import logging
27 033a1d00 Michael Hanselmann
import time
28 4a96f1d1 Michael Hanselmann
import OpenSSL
29 033a1d00 Michael Hanselmann
30 033a1d00 Michael Hanselmann
from ganeti import constants
31 033a1d00 Michael Hanselmann
from ganeti import errors
32 033a1d00 Michael Hanselmann
from ganeti import compat
33 387794f8 Michael Hanselmann
from ganeti import utils
34 387794f8 Michael Hanselmann
from ganeti import objects
35 a744b676 Manuel Franceschini
from ganeti import netutils
36 033a1d00 Michael Hanselmann
37 033a1d00 Michael Hanselmann
38 033a1d00 Michael Hanselmann
class _ImportExportError(Exception):
39 033a1d00 Michael Hanselmann
  """Local exception to report import/export errors.
40 033a1d00 Michael Hanselmann

41 033a1d00 Michael Hanselmann
  """
42 033a1d00 Michael Hanselmann
43 033a1d00 Michael Hanselmann
44 033a1d00 Michael Hanselmann
class ImportExportTimeouts(object):
45 033a1d00 Michael Hanselmann
  #: Time until daemon starts writing status file
46 033a1d00 Michael Hanselmann
  DEFAULT_READY_TIMEOUT = 10
47 033a1d00 Michael Hanselmann
48 033a1d00 Michael Hanselmann
  #: Length of time until errors cause hard failure
49 033a1d00 Michael Hanselmann
  DEFAULT_ERROR_TIMEOUT = 10
50 033a1d00 Michael Hanselmann
51 033a1d00 Michael Hanselmann
  #: Time after which daemon must be listening
52 033a1d00 Michael Hanselmann
  DEFAULT_LISTEN_TIMEOUT = 10
53 033a1d00 Michael Hanselmann
54 1a2e7fe9 Michael Hanselmann
  #: Progress update interval
55 1a2e7fe9 Michael Hanselmann
  DEFAULT_PROGRESS_INTERVAL = 60
56 1a2e7fe9 Michael Hanselmann
57 033a1d00 Michael Hanselmann
  __slots__ = [
58 033a1d00 Michael Hanselmann
    "error",
59 033a1d00 Michael Hanselmann
    "ready",
60 033a1d00 Michael Hanselmann
    "listen",
61 033a1d00 Michael Hanselmann
    "connect",
62 1a2e7fe9 Michael Hanselmann
    "progress",
63 033a1d00 Michael Hanselmann
    ]
64 033a1d00 Michael Hanselmann
65 033a1d00 Michael Hanselmann
  def __init__(self, connect,
66 033a1d00 Michael Hanselmann
               listen=DEFAULT_LISTEN_TIMEOUT,
67 033a1d00 Michael Hanselmann
               error=DEFAULT_ERROR_TIMEOUT,
68 1a2e7fe9 Michael Hanselmann
               ready=DEFAULT_READY_TIMEOUT,
69 1a2e7fe9 Michael Hanselmann
               progress=DEFAULT_PROGRESS_INTERVAL):
70 033a1d00 Michael Hanselmann
    """Initializes this class.
71 033a1d00 Michael Hanselmann

72 033a1d00 Michael Hanselmann
    @type connect: number
73 033a1d00 Michael Hanselmann
    @param connect: Timeout for establishing connection
74 033a1d00 Michael Hanselmann
    @type listen: number
75 033a1d00 Michael Hanselmann
    @param listen: Timeout for starting to listen for connections
76 033a1d00 Michael Hanselmann
    @type error: number
77 033a1d00 Michael Hanselmann
    @param error: Length of time until errors cause hard failure
78 033a1d00 Michael Hanselmann
    @type ready: number
79 033a1d00 Michael Hanselmann
    @param ready: Timeout for daemon to become ready
80 1a2e7fe9 Michael Hanselmann
    @type progress: number
81 1a2e7fe9 Michael Hanselmann
    @param progress: Progress update interval
82 033a1d00 Michael Hanselmann

83 033a1d00 Michael Hanselmann
    """
84 033a1d00 Michael Hanselmann
    self.error = error
85 033a1d00 Michael Hanselmann
    self.ready = ready
86 033a1d00 Michael Hanselmann
    self.listen = listen
87 033a1d00 Michael Hanselmann
    self.connect = connect
88 1a2e7fe9 Michael Hanselmann
    self.progress = progress
89 033a1d00 Michael Hanselmann
90 033a1d00 Michael Hanselmann
91 033a1d00 Michael Hanselmann
class ImportExportCbBase(object):
92 033a1d00 Michael Hanselmann
  """Callbacks for disk import/export.
93 033a1d00 Michael Hanselmann

94 033a1d00 Michael Hanselmann
  """
95 5e26c4d9 Iustin Pop
  def ReportListening(self, ie, private, component):
96 033a1d00 Michael Hanselmann
    """Called when daemon started listening.
97 033a1d00 Michael Hanselmann

98 033a1d00 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
99 033a1d00 Michael Hanselmann
    @param ie: Import/export object
100 033a1d00 Michael Hanselmann
    @param private: Private data passed to import/export object
101 5e26c4d9 Iustin Pop
    @param component: transfer component name
102 033a1d00 Michael Hanselmann

103 033a1d00 Michael Hanselmann
    """
104 033a1d00 Michael Hanselmann
105 033a1d00 Michael Hanselmann
  def ReportConnected(self, ie, private):
106 033a1d00 Michael Hanselmann
    """Called when a connection has been established.
107 033a1d00 Michael Hanselmann

108 033a1d00 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
109 033a1d00 Michael Hanselmann
    @param ie: Import/export object
110 033a1d00 Michael Hanselmann
    @param private: Private data passed to import/export object
111 033a1d00 Michael Hanselmann

112 033a1d00 Michael Hanselmann
    """
113 033a1d00 Michael Hanselmann
114 1a2e7fe9 Michael Hanselmann
  def ReportProgress(self, ie, private):
115 1a2e7fe9 Michael Hanselmann
    """Called when new progress information should be reported.
116 1a2e7fe9 Michael Hanselmann

117 1a2e7fe9 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
118 1a2e7fe9 Michael Hanselmann
    @param ie: Import/export object
119 1a2e7fe9 Michael Hanselmann
    @param private: Private data passed to import/export object
120 1a2e7fe9 Michael Hanselmann

121 1a2e7fe9 Michael Hanselmann
    """
122 1a2e7fe9 Michael Hanselmann
123 033a1d00 Michael Hanselmann
  def ReportFinished(self, ie, private):
124 033a1d00 Michael Hanselmann
    """Called when a transfer has finished.
125 033a1d00 Michael Hanselmann

126 033a1d00 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
127 033a1d00 Michael Hanselmann
    @param ie: Import/export object
128 033a1d00 Michael Hanselmann
    @param private: Private data passed to import/export object
129 033a1d00 Michael Hanselmann

130 033a1d00 Michael Hanselmann
    """
131 033a1d00 Michael Hanselmann
132 033a1d00 Michael Hanselmann
133 033a1d00 Michael Hanselmann
def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
134 033a1d00 Michael Hanselmann
  """Checks whether a timeout has expired.
135 033a1d00 Michael Hanselmann

136 033a1d00 Michael Hanselmann
  """
137 033a1d00 Michael Hanselmann
  return _time_fn() > (epoch + timeout)
138 033a1d00 Michael Hanselmann
139 033a1d00 Michael Hanselmann
140 033a1d00 Michael Hanselmann
class _DiskImportExportBase(object):
141 033a1d00 Michael Hanselmann
  MODE_TEXT = None
142 033a1d00 Michael Hanselmann
143 eb630f50 Michael Hanselmann
  def __init__(self, lu, node_name, opts,
144 5e26c4d9 Iustin Pop
               instance, component, timeouts, cbs, private=None):
145 033a1d00 Michael Hanselmann
    """Initializes this class.
146 033a1d00 Michael Hanselmann

147 033a1d00 Michael Hanselmann
    @param lu: Logical unit instance
148 033a1d00 Michael Hanselmann
    @type node_name: string
149 033a1d00 Michael Hanselmann
    @param node_name: Node name for import
150 eb630f50 Michael Hanselmann
    @type opts: L{objects.ImportExportOptions}
151 eb630f50 Michael Hanselmann
    @param opts: Import/export daemon options
152 033a1d00 Michael Hanselmann
    @type instance: L{objects.Instance}
153 033a1d00 Michael Hanselmann
    @param instance: Instance object
154 5e26c4d9 Iustin Pop
    @type component: string
155 5e26c4d9 Iustin Pop
    @param component: which part of the instance is being imported
156 033a1d00 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
157 033a1d00 Michael Hanselmann
    @param timeouts: Timeouts for this import
158 033a1d00 Michael Hanselmann
    @type cbs: L{ImportExportCbBase}
159 033a1d00 Michael Hanselmann
    @param cbs: Callbacks
160 033a1d00 Michael Hanselmann
    @param private: Private data for callback functions
161 033a1d00 Michael Hanselmann

162 033a1d00 Michael Hanselmann
    """
163 033a1d00 Michael Hanselmann
    assert self.MODE_TEXT
164 033a1d00 Michael Hanselmann
165 033a1d00 Michael Hanselmann
    self._lu = lu
166 033a1d00 Michael Hanselmann
    self.node_name = node_name
167 4478301b Michael Hanselmann
    self._opts = opts.Copy()
168 033a1d00 Michael Hanselmann
    self._instance = instance
169 5e26c4d9 Iustin Pop
    self._component = component
170 033a1d00 Michael Hanselmann
    self._timeouts = timeouts
171 033a1d00 Michael Hanselmann
    self._cbs = cbs
172 033a1d00 Michael Hanselmann
    self._private = private
173 033a1d00 Michael Hanselmann
174 4478301b Michael Hanselmann
    # Set master daemon's timeout in options for import/export daemon
175 4478301b Michael Hanselmann
    assert self._opts.connect_timeout is None
176 4478301b Michael Hanselmann
    self._opts.connect_timeout = timeouts.connect
177 4478301b Michael Hanselmann
178 033a1d00 Michael Hanselmann
    # Parent loop
179 033a1d00 Michael Hanselmann
    self._loop = None
180 033a1d00 Michael Hanselmann
181 033a1d00 Michael Hanselmann
    # Timestamps
182 033a1d00 Michael Hanselmann
    self._ts_begin = None
183 033a1d00 Michael Hanselmann
    self._ts_connected = None
184 033a1d00 Michael Hanselmann
    self._ts_finished = None
185 033a1d00 Michael Hanselmann
    self._ts_cleanup = None
186 1a2e7fe9 Michael Hanselmann
    self._ts_last_progress = None
187 033a1d00 Michael Hanselmann
    self._ts_last_error = None
188 033a1d00 Michael Hanselmann
189 033a1d00 Michael Hanselmann
    # Transfer status
190 033a1d00 Michael Hanselmann
    self.success = None
191 033a1d00 Michael Hanselmann
    self.final_message = None
192 033a1d00 Michael Hanselmann
193 033a1d00 Michael Hanselmann
    # Daemon status
194 033a1d00 Michael Hanselmann
    self._daemon_name = None
195 033a1d00 Michael Hanselmann
    self._daemon = None
196 033a1d00 Michael Hanselmann
197 033a1d00 Michael Hanselmann
  @property
198 033a1d00 Michael Hanselmann
  def recent_output(self):
199 033a1d00 Michael Hanselmann
    """Returns the most recent output from the daemon.
200 033a1d00 Michael Hanselmann

201 033a1d00 Michael Hanselmann
    """
202 033a1d00 Michael Hanselmann
    if self._daemon:
203 c9300bb3 Iustin Pop
      return "\n".join(self._daemon.recent_output)
204 033a1d00 Michael Hanselmann
205 033a1d00 Michael Hanselmann
    return None
206 033a1d00 Michael Hanselmann
207 033a1d00 Michael Hanselmann
  @property
208 1a2e7fe9 Michael Hanselmann
  def progress(self):
209 1a2e7fe9 Michael Hanselmann
    """Returns transfer progress information.
210 1a2e7fe9 Michael Hanselmann

211 1a2e7fe9 Michael Hanselmann
    """
212 1a2e7fe9 Michael Hanselmann
    if not self._daemon:
213 1a2e7fe9 Michael Hanselmann
      return None
214 1a2e7fe9 Michael Hanselmann
215 1a2e7fe9 Michael Hanselmann
    return (self._daemon.progress_mbytes,
216 1a2e7fe9 Michael Hanselmann
            self._daemon.progress_throughput,
217 1a2e7fe9 Michael Hanselmann
            self._daemon.progress_percent,
218 1a2e7fe9 Michael Hanselmann
            self._daemon.progress_eta)
219 1a2e7fe9 Michael Hanselmann
220 1a2e7fe9 Michael Hanselmann
  @property
221 af1d39b1 Michael Hanselmann
  def magic(self):
222 af1d39b1 Michael Hanselmann
    """Returns the magic value for this import/export.
223 af1d39b1 Michael Hanselmann

224 af1d39b1 Michael Hanselmann
    """
225 af1d39b1 Michael Hanselmann
    return self._opts.magic
226 af1d39b1 Michael Hanselmann
227 af1d39b1 Michael Hanselmann
  @property
228 033a1d00 Michael Hanselmann
  def active(self):
229 033a1d00 Michael Hanselmann
    """Determines whether this transport is still active.
230 033a1d00 Michael Hanselmann

231 033a1d00 Michael Hanselmann
    """
232 033a1d00 Michael Hanselmann
    return self.success is None
233 033a1d00 Michael Hanselmann
234 033a1d00 Michael Hanselmann
  @property
235 033a1d00 Michael Hanselmann
  def loop(self):
236 033a1d00 Michael Hanselmann
    """Returns parent loop.
237 033a1d00 Michael Hanselmann

238 033a1d00 Michael Hanselmann
    @rtype: L{ImportExportLoop}
239 033a1d00 Michael Hanselmann

240 033a1d00 Michael Hanselmann
    """
241 033a1d00 Michael Hanselmann
    return self._loop
242 033a1d00 Michael Hanselmann
243 033a1d00 Michael Hanselmann
  def SetLoop(self, loop):
244 033a1d00 Michael Hanselmann
    """Sets the parent loop.
245 033a1d00 Michael Hanselmann

246 033a1d00 Michael Hanselmann
    @type loop: L{ImportExportLoop}
247 033a1d00 Michael Hanselmann

248 033a1d00 Michael Hanselmann
    """
249 033a1d00 Michael Hanselmann
    if self._loop:
250 033a1d00 Michael Hanselmann
      raise errors.ProgrammerError("Loop can only be set once")
251 033a1d00 Michael Hanselmann
252 033a1d00 Michael Hanselmann
    self._loop = loop
253 033a1d00 Michael Hanselmann
254 033a1d00 Michael Hanselmann
  def _StartDaemon(self):
255 033a1d00 Michael Hanselmann
    """Starts the import/export daemon.
256 033a1d00 Michael Hanselmann

257 033a1d00 Michael Hanselmann
    """
258 033a1d00 Michael Hanselmann
    raise NotImplementedError()
259 033a1d00 Michael Hanselmann
260 033a1d00 Michael Hanselmann
  def CheckDaemon(self):
261 033a1d00 Michael Hanselmann
    """Checks whether daemon has been started and if not, starts it.
262 033a1d00 Michael Hanselmann

263 033a1d00 Michael Hanselmann
    @rtype: string
264 033a1d00 Michael Hanselmann
    @return: Daemon name
265 033a1d00 Michael Hanselmann

266 033a1d00 Michael Hanselmann
    """
267 033a1d00 Michael Hanselmann
    assert self._ts_cleanup is None
268 033a1d00 Michael Hanselmann
269 033a1d00 Michael Hanselmann
    if self._daemon_name is None:
270 033a1d00 Michael Hanselmann
      assert self._ts_begin is None
271 033a1d00 Michael Hanselmann
272 033a1d00 Michael Hanselmann
      result = self._StartDaemon()
273 033a1d00 Michael Hanselmann
      if result.fail_msg:
274 033a1d00 Michael Hanselmann
        raise _ImportExportError("Failed to start %s on %s: %s" %
275 033a1d00 Michael Hanselmann
                                 (self.MODE_TEXT, self.node_name,
276 033a1d00 Michael Hanselmann
                                  result.fail_msg))
277 033a1d00 Michael Hanselmann
278 033a1d00 Michael Hanselmann
      daemon_name = result.payload
279 033a1d00 Michael Hanselmann
280 194e8648 Iustin Pop
      logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
281 033a1d00 Michael Hanselmann
                   self.node_name)
282 033a1d00 Michael Hanselmann
283 033a1d00 Michael Hanselmann
      self._ts_begin = time.time()
284 033a1d00 Michael Hanselmann
      self._daemon_name = daemon_name
285 033a1d00 Michael Hanselmann
286 033a1d00 Michael Hanselmann
    return self._daemon_name
287 033a1d00 Michael Hanselmann
288 033a1d00 Michael Hanselmann
  def GetDaemonName(self):
289 033a1d00 Michael Hanselmann
    """Returns the daemon name.
290 033a1d00 Michael Hanselmann

291 033a1d00 Michael Hanselmann
    """
292 033a1d00 Michael Hanselmann
    assert self._daemon_name, "Daemon has not been started"
293 033a1d00 Michael Hanselmann
    assert self._ts_cleanup is None
294 033a1d00 Michael Hanselmann
    return self._daemon_name
295 033a1d00 Michael Hanselmann
296 033a1d00 Michael Hanselmann
  def Abort(self):
297 033a1d00 Michael Hanselmann
    """Sends SIGTERM to import/export daemon (if still active).
298 033a1d00 Michael Hanselmann

299 033a1d00 Michael Hanselmann
    """
300 033a1d00 Michael Hanselmann
    if self._daemon_name:
301 194e8648 Iustin Pop
      self._lu.LogWarning("Aborting %s '%s' on %s",
302 033a1d00 Michael Hanselmann
                          self.MODE_TEXT, self._daemon_name, self.node_name)
303 033a1d00 Michael Hanselmann
      result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
304 033a1d00 Michael Hanselmann
      if result.fail_msg:
305 194e8648 Iustin Pop
        self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
306 033a1d00 Michael Hanselmann
                            self.MODE_TEXT, self._daemon_name,
307 033a1d00 Michael Hanselmann
                            self.node_name, result.fail_msg)
308 033a1d00 Michael Hanselmann
        return False
309 033a1d00 Michael Hanselmann
310 033a1d00 Michael Hanselmann
    return True
311 033a1d00 Michael Hanselmann
312 033a1d00 Michael Hanselmann
  def _SetDaemonData(self, data):
313 033a1d00 Michael Hanselmann
    """Internal function for updating status daemon data.
314 033a1d00 Michael Hanselmann

315 033a1d00 Michael Hanselmann
    @type data: L{objects.ImportExportStatus}
316 033a1d00 Michael Hanselmann
    @param data: Daemon status data
317 033a1d00 Michael Hanselmann

318 033a1d00 Michael Hanselmann
    """
319 033a1d00 Michael Hanselmann
    assert self._ts_begin is not None
320 033a1d00 Michael Hanselmann
321 033a1d00 Michael Hanselmann
    if not data:
322 033a1d00 Michael Hanselmann
      if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
323 033a1d00 Michael Hanselmann
        raise _ImportExportError("Didn't become ready after %s seconds" %
324 033a1d00 Michael Hanselmann
                                 self._timeouts.ready)
325 033a1d00 Michael Hanselmann
326 033a1d00 Michael Hanselmann
      return False
327 033a1d00 Michael Hanselmann
328 033a1d00 Michael Hanselmann
    self._daemon = data
329 033a1d00 Michael Hanselmann
330 033a1d00 Michael Hanselmann
    return True
331 033a1d00 Michael Hanselmann
332 033a1d00 Michael Hanselmann
  def SetDaemonData(self, success, data):
333 033a1d00 Michael Hanselmann
    """Updates daemon status data.
334 033a1d00 Michael Hanselmann

335 033a1d00 Michael Hanselmann
    @type success: bool
336 033a1d00 Michael Hanselmann
    @param success: Whether fetching data was successful or not
337 033a1d00 Michael Hanselmann
    @type data: L{objects.ImportExportStatus}
338 033a1d00 Michael Hanselmann
    @param data: Daemon status data
339 033a1d00 Michael Hanselmann

340 033a1d00 Michael Hanselmann
    """
341 033a1d00 Michael Hanselmann
    if not success:
342 033a1d00 Michael Hanselmann
      if self._ts_last_error is None:
343 033a1d00 Michael Hanselmann
        self._ts_last_error = time.time()
344 033a1d00 Michael Hanselmann
345 033a1d00 Michael Hanselmann
      elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
346 033a1d00 Michael Hanselmann
        raise _ImportExportError("Too many errors while updating data")
347 033a1d00 Michael Hanselmann
348 033a1d00 Michael Hanselmann
      return False
349 033a1d00 Michael Hanselmann
350 033a1d00 Michael Hanselmann
    self._ts_last_error = None
351 033a1d00 Michael Hanselmann
352 033a1d00 Michael Hanselmann
    return self._SetDaemonData(data)
353 033a1d00 Michael Hanselmann
354 033a1d00 Michael Hanselmann
  def CheckListening(self):
355 033a1d00 Michael Hanselmann
    """Checks whether the daemon is listening.
356 033a1d00 Michael Hanselmann

357 033a1d00 Michael Hanselmann
    """
358 033a1d00 Michael Hanselmann
    raise NotImplementedError()
359 033a1d00 Michael Hanselmann
360 033a1d00 Michael Hanselmann
  def _GetConnectedCheckEpoch(self):
361 033a1d00 Michael Hanselmann
    """Returns timeout to calculate connect timeout.
362 033a1d00 Michael Hanselmann

363 033a1d00 Michael Hanselmann
    """
364 033a1d00 Michael Hanselmann
    raise NotImplementedError()
365 033a1d00 Michael Hanselmann
366 033a1d00 Michael Hanselmann
  def CheckConnected(self):
367 033a1d00 Michael Hanselmann
    """Checks whether the daemon is connected.
368 033a1d00 Michael Hanselmann

369 033a1d00 Michael Hanselmann
    @rtype: bool
370 033a1d00 Michael Hanselmann
    @return: Whether the daemon is connected
371 033a1d00 Michael Hanselmann

372 033a1d00 Michael Hanselmann
    """
373 033a1d00 Michael Hanselmann
    assert self._daemon, "Daemon status missing"
374 033a1d00 Michael Hanselmann
375 033a1d00 Michael Hanselmann
    if self._ts_connected is not None:
376 033a1d00 Michael Hanselmann
      return True
377 033a1d00 Michael Hanselmann
378 033a1d00 Michael Hanselmann
    if self._daemon.connected:
379 033a1d00 Michael Hanselmann
      self._ts_connected = time.time()
380 033a1d00 Michael Hanselmann
381 033a1d00 Michael Hanselmann
      # TODO: Log remote peer
382 194e8648 Iustin Pop
      logging.debug("%s '%s' on %s is now connected",
383 033a1d00 Michael Hanselmann
                    self.MODE_TEXT, self._daemon_name, self.node_name)
384 033a1d00 Michael Hanselmann
385 033a1d00 Michael Hanselmann
      self._cbs.ReportConnected(self, self._private)
386 033a1d00 Michael Hanselmann
387 033a1d00 Michael Hanselmann
      return True
388 033a1d00 Michael Hanselmann
389 033a1d00 Michael Hanselmann
    if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
390 033a1d00 Michael Hanselmann
      raise _ImportExportError("Not connected after %s seconds" %
391 033a1d00 Michael Hanselmann
                               self._timeouts.connect)
392 033a1d00 Michael Hanselmann
393 033a1d00 Michael Hanselmann
    return False
394 033a1d00 Michael Hanselmann
395 1a2e7fe9 Michael Hanselmann
  def _CheckProgress(self):
396 1a2e7fe9 Michael Hanselmann
    """Checks whether a progress update should be reported.
397 1a2e7fe9 Michael Hanselmann

398 1a2e7fe9 Michael Hanselmann
    """
399 1a2e7fe9 Michael Hanselmann
    if ((self._ts_last_progress is None or
400 1a2e7fe9 Michael Hanselmann
         _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
401 1a2e7fe9 Michael Hanselmann
        self._daemon and
402 1a2e7fe9 Michael Hanselmann
        self._daemon.progress_mbytes is not None and
403 1a2e7fe9 Michael Hanselmann
        self._daemon.progress_throughput is not None):
404 1a2e7fe9 Michael Hanselmann
      self._cbs.ReportProgress(self, self._private)
405 1a2e7fe9 Michael Hanselmann
      self._ts_last_progress = time.time()
406 1a2e7fe9 Michael Hanselmann
407 033a1d00 Michael Hanselmann
  def CheckFinished(self):
408 033a1d00 Michael Hanselmann
    """Checks whether the daemon exited.
409 033a1d00 Michael Hanselmann

410 033a1d00 Michael Hanselmann
    @rtype: bool
411 033a1d00 Michael Hanselmann
    @return: Whether the transfer is finished
412 033a1d00 Michael Hanselmann

413 033a1d00 Michael Hanselmann
    """
414 033a1d00 Michael Hanselmann
    assert self._daemon, "Daemon status missing"
415 033a1d00 Michael Hanselmann
416 033a1d00 Michael Hanselmann
    if self._ts_finished:
417 033a1d00 Michael Hanselmann
      return True
418 033a1d00 Michael Hanselmann
419 033a1d00 Michael Hanselmann
    if self._daemon.exit_status is None:
420 1a2e7fe9 Michael Hanselmann
      # TODO: Adjust delay for ETA expiring soon
421 1a2e7fe9 Michael Hanselmann
      self._CheckProgress()
422 033a1d00 Michael Hanselmann
      return False
423 033a1d00 Michael Hanselmann
424 033a1d00 Michael Hanselmann
    self._ts_finished = time.time()
425 033a1d00 Michael Hanselmann
426 033a1d00 Michael Hanselmann
    self._ReportFinished(self._daemon.exit_status == 0,
427 033a1d00 Michael Hanselmann
                         self._daemon.error_message)
428 033a1d00 Michael Hanselmann
429 033a1d00 Michael Hanselmann
    return True
430 033a1d00 Michael Hanselmann
431 033a1d00 Michael Hanselmann
  def _ReportFinished(self, success, message):
432 033a1d00 Michael Hanselmann
    """Transfer is finished or daemon exited.
433 033a1d00 Michael Hanselmann

434 033a1d00 Michael Hanselmann
    @type success: bool
435 033a1d00 Michael Hanselmann
    @param success: Whether the transfer was successful
436 033a1d00 Michael Hanselmann
    @type message: string
437 033a1d00 Michael Hanselmann
    @param message: Error message
438 033a1d00 Michael Hanselmann

439 033a1d00 Michael Hanselmann
    """
440 033a1d00 Michael Hanselmann
    assert self.success is None
441 033a1d00 Michael Hanselmann
442 033a1d00 Michael Hanselmann
    self.success = success
443 033a1d00 Michael Hanselmann
    self.final_message = message
444 033a1d00 Michael Hanselmann
445 033a1d00 Michael Hanselmann
    if success:
446 194e8648 Iustin Pop
      logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
447 194e8648 Iustin Pop
                   self._daemon_name, self.node_name)
448 033a1d00 Michael Hanselmann
    elif self._daemon_name:
449 194e8648 Iustin Pop
      self._lu.LogWarning("%s '%s' on %s failed: %s",
450 033a1d00 Michael Hanselmann
                          self.MODE_TEXT, self._daemon_name, self.node_name,
451 033a1d00 Michael Hanselmann
                          message)
452 033a1d00 Michael Hanselmann
    else:
453 033a1d00 Michael Hanselmann
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
454 033a1d00 Michael Hanselmann
                          self.node_name, message)
455 033a1d00 Michael Hanselmann
456 033a1d00 Michael Hanselmann
    self._cbs.ReportFinished(self, self._private)
457 033a1d00 Michael Hanselmann
458 033a1d00 Michael Hanselmann
  def _Finalize(self):
459 033a1d00 Michael Hanselmann
    """Makes the RPC call to finalize this import/export.
460 033a1d00 Michael Hanselmann

461 033a1d00 Michael Hanselmann
    """
462 033a1d00 Michael Hanselmann
    return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
463 033a1d00 Michael Hanselmann
464 033a1d00 Michael Hanselmann
  def Finalize(self, error=None):
465 033a1d00 Michael Hanselmann
    """Finalizes this import/export.
466 033a1d00 Michael Hanselmann

467 033a1d00 Michael Hanselmann
    """
468 033a1d00 Michael Hanselmann
    if self._daemon_name:
469 194e8648 Iustin Pop
      logging.info("Finalizing %s '%s' on %s",
470 033a1d00 Michael Hanselmann
                   self.MODE_TEXT, self._daemon_name, self.node_name)
471 033a1d00 Michael Hanselmann
472 033a1d00 Michael Hanselmann
      result = self._Finalize()
473 033a1d00 Michael Hanselmann
      if result.fail_msg:
474 194e8648 Iustin Pop
        self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
475 033a1d00 Michael Hanselmann
                            self.MODE_TEXT, self._daemon_name,
476 033a1d00 Michael Hanselmann
                            self.node_name, result.fail_msg)
477 033a1d00 Michael Hanselmann
        return False
478 033a1d00 Michael Hanselmann
479 033a1d00 Michael Hanselmann
      # Daemon is no longer running
480 033a1d00 Michael Hanselmann
      self._daemon_name = None
481 033a1d00 Michael Hanselmann
      self._ts_cleanup = time.time()
482 033a1d00 Michael Hanselmann
483 033a1d00 Michael Hanselmann
    if error:
484 033a1d00 Michael Hanselmann
      self._ReportFinished(False, error)
485 033a1d00 Michael Hanselmann
486 033a1d00 Michael Hanselmann
    return True
487 033a1d00 Michael Hanselmann
488 033a1d00 Michael Hanselmann
489 033a1d00 Michael Hanselmann
class DiskImport(_DiskImportExportBase):
490 033a1d00 Michael Hanselmann
  MODE_TEXT = "import"
491 033a1d00 Michael Hanselmann
492 5e26c4d9 Iustin Pop
  def __init__(self, lu, node_name, opts, instance, component,
493 033a1d00 Michael Hanselmann
               dest, dest_args, timeouts, cbs, private=None):
494 033a1d00 Michael Hanselmann
    """Initializes this class.
495 033a1d00 Michael Hanselmann

496 033a1d00 Michael Hanselmann
    @param lu: Logical unit instance
497 033a1d00 Michael Hanselmann
    @type node_name: string
498 033a1d00 Michael Hanselmann
    @param node_name: Node name for import
499 eb630f50 Michael Hanselmann
    @type opts: L{objects.ImportExportOptions}
500 eb630f50 Michael Hanselmann
    @param opts: Import/export daemon options
501 033a1d00 Michael Hanselmann
    @type instance: L{objects.Instance}
502 033a1d00 Michael Hanselmann
    @param instance: Instance object
503 5e26c4d9 Iustin Pop
    @type component: string
504 5e26c4d9 Iustin Pop
    @param component: which part of the instance is being imported
505 033a1d00 Michael Hanselmann
    @param dest: I/O destination
506 033a1d00 Michael Hanselmann
    @param dest_args: I/O arguments
507 033a1d00 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
508 033a1d00 Michael Hanselmann
    @param timeouts: Timeouts for this import
509 033a1d00 Michael Hanselmann
    @type cbs: L{ImportExportCbBase}
510 033a1d00 Michael Hanselmann
    @param cbs: Callbacks
511 033a1d00 Michael Hanselmann
    @param private: Private data for callback functions
512 033a1d00 Michael Hanselmann

513 033a1d00 Michael Hanselmann
    """
514 5e26c4d9 Iustin Pop
    _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
515 5e26c4d9 Iustin Pop
                                   component, timeouts, cbs, private)
516 033a1d00 Michael Hanselmann
    self._dest = dest
517 033a1d00 Michael Hanselmann
    self._dest_args = dest_args
518 033a1d00 Michael Hanselmann
519 033a1d00 Michael Hanselmann
    # Timestamps
520 033a1d00 Michael Hanselmann
    self._ts_listening = None
521 033a1d00 Michael Hanselmann
522 033a1d00 Michael Hanselmann
  @property
523 033a1d00 Michael Hanselmann
  def listen_port(self):
524 033a1d00 Michael Hanselmann
    """Returns the port the daemon is listening on.
525 033a1d00 Michael Hanselmann

526 033a1d00 Michael Hanselmann
    """
527 033a1d00 Michael Hanselmann
    if self._daemon:
528 033a1d00 Michael Hanselmann
      return self._daemon.listen_port
529 033a1d00 Michael Hanselmann
530 033a1d00 Michael Hanselmann
    return None
531 033a1d00 Michael Hanselmann
532 033a1d00 Michael Hanselmann
  def _StartDaemon(self):
533 033a1d00 Michael Hanselmann
    """Starts the import daemon.
534 033a1d00 Michael Hanselmann

535 033a1d00 Michael Hanselmann
    """
536 eb630f50 Michael Hanselmann
    return self._lu.rpc.call_import_start(self.node_name, self._opts,
537 6613661a Iustin Pop
                                          self._instance, self._component,
538 033a1d00 Michael Hanselmann
                                          self._dest, self._dest_args)
539 033a1d00 Michael Hanselmann
540 033a1d00 Michael Hanselmann
  def CheckListening(self):
541 033a1d00 Michael Hanselmann
    """Checks whether the daemon is listening.
542 033a1d00 Michael Hanselmann

543 033a1d00 Michael Hanselmann
    @rtype: bool
544 033a1d00 Michael Hanselmann
    @return: Whether the daemon is listening
545 033a1d00 Michael Hanselmann

546 033a1d00 Michael Hanselmann
    """
547 033a1d00 Michael Hanselmann
    assert self._daemon, "Daemon status missing"
548 033a1d00 Michael Hanselmann
549 033a1d00 Michael Hanselmann
    if self._ts_listening is not None:
550 033a1d00 Michael Hanselmann
      return True
551 033a1d00 Michael Hanselmann
552 033a1d00 Michael Hanselmann
    port = self._daemon.listen_port
553 033a1d00 Michael Hanselmann
    if port is not None:
554 033a1d00 Michael Hanselmann
      self._ts_listening = time.time()
555 033a1d00 Michael Hanselmann
556 194e8648 Iustin Pop
      logging.debug("Import '%s' on %s is now listening on port %s",
557 033a1d00 Michael Hanselmann
                    self._daemon_name, self.node_name, port)
558 033a1d00 Michael Hanselmann
559 5e26c4d9 Iustin Pop
      self._cbs.ReportListening(self, self._private, self._component)
560 033a1d00 Michael Hanselmann
561 033a1d00 Michael Hanselmann
      return True
562 033a1d00 Michael Hanselmann
563 033a1d00 Michael Hanselmann
    if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
564 033a1d00 Michael Hanselmann
      raise _ImportExportError("Not listening after %s seconds" %
565 033a1d00 Michael Hanselmann
                               self._timeouts.listen)
566 033a1d00 Michael Hanselmann
567 033a1d00 Michael Hanselmann
    return False
568 033a1d00 Michael Hanselmann
569 033a1d00 Michael Hanselmann
  def _GetConnectedCheckEpoch(self):
570 033a1d00 Michael Hanselmann
    """Returns the time since we started listening.
571 033a1d00 Michael Hanselmann

572 033a1d00 Michael Hanselmann
    """
573 033a1d00 Michael Hanselmann
    assert self._ts_listening is not None, \
574 033a1d00 Michael Hanselmann
           ("Checking whether an import is connected is only useful"
575 033a1d00 Michael Hanselmann
            " once it's been listening")
576 033a1d00 Michael Hanselmann
577 033a1d00 Michael Hanselmann
    return self._ts_listening
578 033a1d00 Michael Hanselmann
579 033a1d00 Michael Hanselmann
580 033a1d00 Michael Hanselmann
class DiskExport(_DiskImportExportBase):
581 033a1d00 Michael Hanselmann
  MODE_TEXT = "export"
582 033a1d00 Michael Hanselmann
583 5e26c4d9 Iustin Pop
  def __init__(self, lu, node_name, opts, dest_host, dest_port,
584 5e26c4d9 Iustin Pop
               instance, component, source, source_args,
585 033a1d00 Michael Hanselmann
               timeouts, cbs, private=None):
586 033a1d00 Michael Hanselmann
    """Initializes this class.
587 033a1d00 Michael Hanselmann

588 033a1d00 Michael Hanselmann
    @param lu: Logical unit instance
589 033a1d00 Michael Hanselmann
    @type node_name: string
590 033a1d00 Michael Hanselmann
    @param node_name: Node name for import
591 eb630f50 Michael Hanselmann
    @type opts: L{objects.ImportExportOptions}
592 eb630f50 Michael Hanselmann
    @param opts: Import/export daemon options
593 033a1d00 Michael Hanselmann
    @type dest_host: string
594 033a1d00 Michael Hanselmann
    @param dest_host: Destination host name or IP address
595 033a1d00 Michael Hanselmann
    @type dest_port: number
596 033a1d00 Michael Hanselmann
    @param dest_port: Destination port number
597 033a1d00 Michael Hanselmann
    @type instance: L{objects.Instance}
598 033a1d00 Michael Hanselmann
    @param instance: Instance object
599 5e26c4d9 Iustin Pop
    @type component: string
600 5e26c4d9 Iustin Pop
    @param component: which part of the instance is being imported
601 033a1d00 Michael Hanselmann
    @param source: I/O source
602 033a1d00 Michael Hanselmann
    @param source_args: I/O source
603 033a1d00 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
604 033a1d00 Michael Hanselmann
    @param timeouts: Timeouts for this import
605 033a1d00 Michael Hanselmann
    @type cbs: L{ImportExportCbBase}
606 033a1d00 Michael Hanselmann
    @param cbs: Callbacks
607 033a1d00 Michael Hanselmann
    @param private: Private data for callback functions
608 033a1d00 Michael Hanselmann

609 033a1d00 Michael Hanselmann
    """
610 5e26c4d9 Iustin Pop
    _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
611 5e26c4d9 Iustin Pop
                                   component, timeouts, cbs, private)
612 033a1d00 Michael Hanselmann
    self._dest_host = dest_host
613 033a1d00 Michael Hanselmann
    self._dest_port = dest_port
614 033a1d00 Michael Hanselmann
    self._source = source
615 033a1d00 Michael Hanselmann
    self._source_args = source_args
616 033a1d00 Michael Hanselmann
617 033a1d00 Michael Hanselmann
  def _StartDaemon(self):
618 033a1d00 Michael Hanselmann
    """Starts the export daemon.
619 033a1d00 Michael Hanselmann

620 033a1d00 Michael Hanselmann
    """
621 eb630f50 Michael Hanselmann
    return self._lu.rpc.call_export_start(self.node_name, self._opts,
622 033a1d00 Michael Hanselmann
                                          self._dest_host, self._dest_port,
623 6613661a Iustin Pop
                                          self._instance, self._component,
624 6613661a Iustin Pop
                                          self._source, self._source_args)
625 033a1d00 Michael Hanselmann
626 033a1d00 Michael Hanselmann
  def CheckListening(self):
627 033a1d00 Michael Hanselmann
    """Checks whether the daemon is listening.
628 033a1d00 Michael Hanselmann

629 033a1d00 Michael Hanselmann
    """
630 033a1d00 Michael Hanselmann
    # Only an import can be listening
631 033a1d00 Michael Hanselmann
    return True
632 033a1d00 Michael Hanselmann
633 033a1d00 Michael Hanselmann
  def _GetConnectedCheckEpoch(self):
634 033a1d00 Michael Hanselmann
    """Returns the time since the daemon started.
635 033a1d00 Michael Hanselmann

636 033a1d00 Michael Hanselmann
    """
637 033a1d00 Michael Hanselmann
    assert self._ts_begin is not None
638 033a1d00 Michael Hanselmann
639 033a1d00 Michael Hanselmann
    return self._ts_begin
640 033a1d00 Michael Hanselmann
641 033a1d00 Michael Hanselmann
642 1a2e7fe9 Michael Hanselmann
def FormatProgress(progress):
643 1a2e7fe9 Michael Hanselmann
  """Formats progress information for user consumption
644 1a2e7fe9 Michael Hanselmann

645 1a2e7fe9 Michael Hanselmann
  """
646 e6b8d02d Michael Hanselmann
  (mbytes, throughput, percent, eta) = progress
647 1a2e7fe9 Michael Hanselmann
648 1a2e7fe9 Michael Hanselmann
  parts = [
649 1a2e7fe9 Michael Hanselmann
    utils.FormatUnit(mbytes, "h"),
650 1a2e7fe9 Michael Hanselmann
651 1a2e7fe9 Michael Hanselmann
    # Not using FormatUnit as it doesn't support kilobytes
652 1a2e7fe9 Michael Hanselmann
    "%0.1f MiB/s" % throughput,
653 1a2e7fe9 Michael Hanselmann
    ]
654 1a2e7fe9 Michael Hanselmann
655 1a2e7fe9 Michael Hanselmann
  if percent is not None:
656 1a2e7fe9 Michael Hanselmann
    parts.append("%d%%" % percent)
657 1a2e7fe9 Michael Hanselmann
658 e6b8d02d Michael Hanselmann
  if eta is not None:
659 e6b8d02d Michael Hanselmann
    parts.append("ETA %s" % utils.FormatSeconds(eta))
660 1a2e7fe9 Michael Hanselmann
661 1a2e7fe9 Michael Hanselmann
  return utils.CommaJoin(parts)
662 1a2e7fe9 Michael Hanselmann
663 1a2e7fe9 Michael Hanselmann
664 033a1d00 Michael Hanselmann
class ImportExportLoop:
665 033a1d00 Michael Hanselmann
  MIN_DELAY = 1.0
666 033a1d00 Michael Hanselmann
  MAX_DELAY = 20.0
667 033a1d00 Michael Hanselmann
668 033a1d00 Michael Hanselmann
  def __init__(self, lu):
669 033a1d00 Michael Hanselmann
    """Initializes this class.
670 033a1d00 Michael Hanselmann

671 033a1d00 Michael Hanselmann
    """
672 033a1d00 Michael Hanselmann
    self._lu = lu
673 033a1d00 Michael Hanselmann
    self._queue = []
674 033a1d00 Michael Hanselmann
    self._pending_add = []
675 033a1d00 Michael Hanselmann
676 033a1d00 Michael Hanselmann
  def Add(self, diskie):
677 033a1d00 Michael Hanselmann
    """Adds an import/export object to the loop.
678 033a1d00 Michael Hanselmann

679 033a1d00 Michael Hanselmann
    @type diskie: Subclass of L{_DiskImportExportBase}
680 033a1d00 Michael Hanselmann
    @param diskie: Import/export object
681 033a1d00 Michael Hanselmann

682 033a1d00 Michael Hanselmann
    """
683 033a1d00 Michael Hanselmann
    assert diskie not in self._pending_add
684 033a1d00 Michael Hanselmann
    assert diskie.loop is None
685 033a1d00 Michael Hanselmann
686 033a1d00 Michael Hanselmann
    diskie.SetLoop(self)
687 033a1d00 Michael Hanselmann
688 033a1d00 Michael Hanselmann
    # Adding new objects to a staging list is necessary, otherwise the main
689 033a1d00 Michael Hanselmann
    # loop gets confused if callbacks modify the queue while the main loop is
690 033a1d00 Michael Hanselmann
    # iterating over it.
691 033a1d00 Michael Hanselmann
    self._pending_add.append(diskie)
692 033a1d00 Michael Hanselmann
693 033a1d00 Michael Hanselmann
  @staticmethod
694 033a1d00 Michael Hanselmann
  def _CollectDaemonStatus(lu, daemons):
695 033a1d00 Michael Hanselmann
    """Collects the status for all import/export daemons.
696 033a1d00 Michael Hanselmann

697 033a1d00 Michael Hanselmann
    """
698 033a1d00 Michael Hanselmann
    daemon_status = {}
699 033a1d00 Michael Hanselmann
700 033a1d00 Michael Hanselmann
    for node_name, names in daemons.iteritems():
701 033a1d00 Michael Hanselmann
      result = lu.rpc.call_impexp_status(node_name, names)
702 033a1d00 Michael Hanselmann
      if result.fail_msg:
703 033a1d00 Michael Hanselmann
        lu.LogWarning("Failed to get daemon status on %s: %s",
704 033a1d00 Michael Hanselmann
                      node_name, result.fail_msg)
705 033a1d00 Michael Hanselmann
        continue
706 033a1d00 Michael Hanselmann
707 033a1d00 Michael Hanselmann
      assert len(names) == len(result.payload)
708 033a1d00 Michael Hanselmann
709 033a1d00 Michael Hanselmann
      daemon_status[node_name] = dict(zip(names, result.payload))
710 033a1d00 Michael Hanselmann
711 033a1d00 Michael Hanselmann
    return daemon_status
712 033a1d00 Michael Hanselmann
713 033a1d00 Michael Hanselmann
  @staticmethod
714 033a1d00 Michael Hanselmann
  def _GetActiveDaemonNames(queue):
715 033a1d00 Michael Hanselmann
    """Gets the names of all active daemons.
716 033a1d00 Michael Hanselmann

717 033a1d00 Michael Hanselmann
    """
718 033a1d00 Michael Hanselmann
    result = {}
719 033a1d00 Michael Hanselmann
    for diskie in queue:
720 033a1d00 Michael Hanselmann
      if not diskie.active:
721 033a1d00 Michael Hanselmann
        continue
722 033a1d00 Michael Hanselmann
723 033a1d00 Michael Hanselmann
      try:
724 033a1d00 Michael Hanselmann
        # Start daemon if necessary
725 033a1d00 Michael Hanselmann
        daemon_name = diskie.CheckDaemon()
726 033a1d00 Michael Hanselmann
      except _ImportExportError, err:
727 033a1d00 Michael Hanselmann
        logging.exception("%s failed", diskie.MODE_TEXT)
728 033a1d00 Michael Hanselmann
        diskie.Finalize(error=str(err))
729 033a1d00 Michael Hanselmann
        continue
730 033a1d00 Michael Hanselmann
731 033a1d00 Michael Hanselmann
      result.setdefault(diskie.node_name, []).append(daemon_name)
732 033a1d00 Michael Hanselmann
733 033a1d00 Michael Hanselmann
    assert len(queue) >= len(result)
734 033a1d00 Michael Hanselmann
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
735 033a1d00 Michael Hanselmann
736 033a1d00 Michael Hanselmann
    logging.debug("daemons=%r", result)
737 033a1d00 Michael Hanselmann
738 033a1d00 Michael Hanselmann
    return result
739 033a1d00 Michael Hanselmann
740 033a1d00 Michael Hanselmann
  def _AddPendingToQueue(self):
741 033a1d00 Michael Hanselmann
    """Adds all pending import/export objects to the internal queue.
742 033a1d00 Michael Hanselmann

743 033a1d00 Michael Hanselmann
    """
744 033a1d00 Michael Hanselmann
    assert compat.all(diskie not in self._queue and diskie.loop == self
745 033a1d00 Michael Hanselmann
                      for diskie in self._pending_add)
746 033a1d00 Michael Hanselmann
747 033a1d00 Michael Hanselmann
    self._queue.extend(self._pending_add)
748 033a1d00 Michael Hanselmann
749 033a1d00 Michael Hanselmann
    del self._pending_add[:]
750 033a1d00 Michael Hanselmann
751 033a1d00 Michael Hanselmann
  def Run(self):
752 033a1d00 Michael Hanselmann
    """Utility main loop.
753 033a1d00 Michael Hanselmann

754 033a1d00 Michael Hanselmann
    """
755 033a1d00 Michael Hanselmann
    while True:
756 033a1d00 Michael Hanselmann
      self._AddPendingToQueue()
757 033a1d00 Michael Hanselmann
758 033a1d00 Michael Hanselmann
      # Collect all active daemon names
759 033a1d00 Michael Hanselmann
      daemons = self._GetActiveDaemonNames(self._queue)
760 033a1d00 Michael Hanselmann
      if not daemons:
761 033a1d00 Michael Hanselmann
        break
762 033a1d00 Michael Hanselmann
763 033a1d00 Michael Hanselmann
      # Collection daemon status data
764 033a1d00 Michael Hanselmann
      data = self._CollectDaemonStatus(self._lu, daemons)
765 033a1d00 Michael Hanselmann
766 033a1d00 Michael Hanselmann
      # Use data
767 033a1d00 Michael Hanselmann
      delay = self.MAX_DELAY
768 033a1d00 Michael Hanselmann
      for diskie in self._queue:
769 033a1d00 Michael Hanselmann
        if not diskie.active:
770 033a1d00 Michael Hanselmann
          continue
771 033a1d00 Michael Hanselmann
772 033a1d00 Michael Hanselmann
        try:
773 033a1d00 Michael Hanselmann
          try:
774 033a1d00 Michael Hanselmann
            all_daemon_data = data[diskie.node_name]
775 033a1d00 Michael Hanselmann
          except KeyError:
776 033a1d00 Michael Hanselmann
            result = diskie.SetDaemonData(False, None)
777 033a1d00 Michael Hanselmann
          else:
778 033a1d00 Michael Hanselmann
            result = \
779 033a1d00 Michael Hanselmann
              diskie.SetDaemonData(True,
780 033a1d00 Michael Hanselmann
                                   all_daemon_data[diskie.GetDaemonName()])
781 033a1d00 Michael Hanselmann
782 033a1d00 Michael Hanselmann
          if not result:
783 033a1d00 Michael Hanselmann
            # Daemon not yet ready, retry soon
784 033a1d00 Michael Hanselmann
            delay = min(3.0, delay)
785 033a1d00 Michael Hanselmann
            continue
786 033a1d00 Michael Hanselmann
787 033a1d00 Michael Hanselmann
          if diskie.CheckFinished():
788 033a1d00 Michael Hanselmann
            # Transfer finished
789 033a1d00 Michael Hanselmann
            diskie.Finalize()
790 033a1d00 Michael Hanselmann
            continue
791 033a1d00 Michael Hanselmann
792 033a1d00 Michael Hanselmann
          # Normal case: check again in 5 seconds
793 033a1d00 Michael Hanselmann
          delay = min(5.0, delay)
794 033a1d00 Michael Hanselmann
795 033a1d00 Michael Hanselmann
          if not diskie.CheckListening():
796 033a1d00 Michael Hanselmann
            # Not yet listening, retry soon
797 033a1d00 Michael Hanselmann
            delay = min(1.0, delay)
798 033a1d00 Michael Hanselmann
            continue
799 033a1d00 Michael Hanselmann
800 033a1d00 Michael Hanselmann
          if not diskie.CheckConnected():
801 033a1d00 Michael Hanselmann
            # Not yet connected, retry soon
802 033a1d00 Michael Hanselmann
            delay = min(1.0, delay)
803 033a1d00 Michael Hanselmann
            continue
804 033a1d00 Michael Hanselmann
805 033a1d00 Michael Hanselmann
        except _ImportExportError, err:
806 033a1d00 Michael Hanselmann
          logging.exception("%s failed", diskie.MODE_TEXT)
807 033a1d00 Michael Hanselmann
          diskie.Finalize(error=str(err))
808 033a1d00 Michael Hanselmann
809 403f5172 Guido Trotter
      if not compat.any(diskie.active for diskie in self._queue):
810 033a1d00 Michael Hanselmann
        break
811 033a1d00 Michael Hanselmann
812 033a1d00 Michael Hanselmann
      # Wait a bit
813 033a1d00 Michael Hanselmann
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
814 033a1d00 Michael Hanselmann
      logging.debug("Waiting for %ss", delay)
815 033a1d00 Michael Hanselmann
      time.sleep(delay)
816 033a1d00 Michael Hanselmann
817 033a1d00 Michael Hanselmann
  def FinalizeAll(self):
818 033a1d00 Michael Hanselmann
    """Finalizes all pending transfers.
819 033a1d00 Michael Hanselmann

820 033a1d00 Michael Hanselmann
    """
821 033a1d00 Michael Hanselmann
    success = True
822 033a1d00 Michael Hanselmann
823 033a1d00 Michael Hanselmann
    for diskie in self._queue:
824 033a1d00 Michael Hanselmann
      success = diskie.Finalize() and success
825 033a1d00 Michael Hanselmann
826 033a1d00 Michael Hanselmann
    return success
827 5d97d6dd Michael Hanselmann
828 5d97d6dd Michael Hanselmann
829 5d97d6dd Michael Hanselmann
class _TransferInstCbBase(ImportExportCbBase):
830 5d97d6dd Michael Hanselmann
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
831 d51ae04c Michael Hanselmann
               dest_node, dest_ip):
832 5d97d6dd Michael Hanselmann
    """Initializes this class.
833 5d97d6dd Michael Hanselmann

834 5d97d6dd Michael Hanselmann
    """
835 5d97d6dd Michael Hanselmann
    ImportExportCbBase.__init__(self)
836 5d97d6dd Michael Hanselmann
837 5d97d6dd Michael Hanselmann
    self.lu = lu
838 5d97d6dd Michael Hanselmann
    self.feedback_fn = feedback_fn
839 5d97d6dd Michael Hanselmann
    self.instance = instance
840 5d97d6dd Michael Hanselmann
    self.timeouts = timeouts
841 5d97d6dd Michael Hanselmann
    self.src_node = src_node
842 5d97d6dd Michael Hanselmann
    self.src_cbs = src_cbs
843 5d97d6dd Michael Hanselmann
    self.dest_node = dest_node
844 5d97d6dd Michael Hanselmann
    self.dest_ip = dest_ip
845 5d97d6dd Michael Hanselmann
846 5d97d6dd Michael Hanselmann
847 5d97d6dd Michael Hanselmann
class _TransferInstSourceCb(_TransferInstCbBase):
848 5d97d6dd Michael Hanselmann
  def ReportConnected(self, ie, dtp):
849 5d97d6dd Michael Hanselmann
    """Called when a connection has been established.
850 5d97d6dd Michael Hanselmann

851 5d97d6dd Michael Hanselmann
    """
852 5d97d6dd Michael Hanselmann
    assert self.src_cbs is None
853 5d97d6dd Michael Hanselmann
    assert dtp.src_export == ie
854 5d97d6dd Michael Hanselmann
    assert dtp.dest_import
855 5d97d6dd Michael Hanselmann
856 5d97d6dd Michael Hanselmann
    self.feedback_fn("%s is sending data on %s" %
857 5d97d6dd Michael Hanselmann
                     (dtp.data.name, ie.node_name))
858 5d97d6dd Michael Hanselmann
859 1a2e7fe9 Michael Hanselmann
  def ReportProgress(self, ie, dtp):
860 1a2e7fe9 Michael Hanselmann
    """Called when new progress information should be reported.
861 1a2e7fe9 Michael Hanselmann

862 1a2e7fe9 Michael Hanselmann
    """
863 1a2e7fe9 Michael Hanselmann
    progress = ie.progress
864 1a2e7fe9 Michael Hanselmann
    if not progress:
865 1a2e7fe9 Michael Hanselmann
      return
866 1a2e7fe9 Michael Hanselmann
867 1a2e7fe9 Michael Hanselmann
    self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
868 1a2e7fe9 Michael Hanselmann
869 5d97d6dd Michael Hanselmann
  def ReportFinished(self, ie, dtp):
870 5d97d6dd Michael Hanselmann
    """Called when a transfer has finished.
871 5d97d6dd Michael Hanselmann

872 5d97d6dd Michael Hanselmann
    """
873 5d97d6dd Michael Hanselmann
    assert self.src_cbs is None
874 5d97d6dd Michael Hanselmann
    assert dtp.src_export == ie
875 5d97d6dd Michael Hanselmann
    assert dtp.dest_import
876 5d97d6dd Michael Hanselmann
877 5d97d6dd Michael Hanselmann
    if ie.success:
878 5d97d6dd Michael Hanselmann
      self.feedback_fn("%s finished sending data" % dtp.data.name)
879 5d97d6dd Michael Hanselmann
    else:
880 c9300bb3 Iustin Pop
      self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
881 5d97d6dd Michael Hanselmann
                       (dtp.data.name, ie.final_message, ie.recent_output))
882 5d97d6dd Michael Hanselmann
883 5d97d6dd Michael Hanselmann
    dtp.RecordResult(ie.success)
884 5d97d6dd Michael Hanselmann
885 5d97d6dd Michael Hanselmann
    cb = dtp.data.finished_fn
886 5d97d6dd Michael Hanselmann
    if cb:
887 5d97d6dd Michael Hanselmann
      cb()
888 5d97d6dd Michael Hanselmann
889 5d97d6dd Michael Hanselmann
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
890 5d97d6dd Michael Hanselmann
    # give the daemon a moment to sort things out
891 5d97d6dd Michael Hanselmann
    if dtp.dest_import and not ie.success:
892 5d97d6dd Michael Hanselmann
      dtp.dest_import.Abort()
893 5d97d6dd Michael Hanselmann
894 5d97d6dd Michael Hanselmann
895 5d97d6dd Michael Hanselmann
class _TransferInstDestCb(_TransferInstCbBase):
896 5e26c4d9 Iustin Pop
  def ReportListening(self, ie, dtp, component):
897 5d97d6dd Michael Hanselmann
    """Called when daemon started listening.
898 5d97d6dd Michael Hanselmann

899 5d97d6dd Michael Hanselmann
    """
900 5d97d6dd Michael Hanselmann
    assert self.src_cbs
901 5d97d6dd Michael Hanselmann
    assert dtp.src_export is None
902 5d97d6dd Michael Hanselmann
    assert dtp.dest_import
903 d51ae04c Michael Hanselmann
    assert dtp.export_opts
904 5d97d6dd Michael Hanselmann
905 5d97d6dd Michael Hanselmann
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
906 5d97d6dd Michael Hanselmann
907 5d97d6dd Michael Hanselmann
    # Start export on source node
908 d51ae04c Michael Hanselmann
    de = DiskExport(self.lu, self.src_node, dtp.export_opts,
909 5e26c4d9 Iustin Pop
                    self.dest_ip, ie.listen_port, self.instance,
910 5e26c4d9 Iustin Pop
                    component, dtp.data.src_io, dtp.data.src_ioargs,
911 5d97d6dd Michael Hanselmann
                    self.timeouts, self.src_cbs, private=dtp)
912 5d97d6dd Michael Hanselmann
    ie.loop.Add(de)
913 5d97d6dd Michael Hanselmann
914 5d97d6dd Michael Hanselmann
    dtp.src_export = de
915 5d97d6dd Michael Hanselmann
916 5d97d6dd Michael Hanselmann
  def ReportConnected(self, ie, dtp):
917 5d97d6dd Michael Hanselmann
    """Called when a connection has been established.
918 5d97d6dd Michael Hanselmann

919 5d97d6dd Michael Hanselmann
    """
920 5d97d6dd Michael Hanselmann
    self.feedback_fn("%s is receiving data on %s" %
921 5d97d6dd Michael Hanselmann
                     (dtp.data.name, self.dest_node))
922 5d97d6dd Michael Hanselmann
923 5d97d6dd Michael Hanselmann
  def ReportFinished(self, ie, dtp):
924 5d97d6dd Michael Hanselmann
    """Called when a transfer has finished.
925 5d97d6dd Michael Hanselmann

926 5d97d6dd Michael Hanselmann
    """
927 5d97d6dd Michael Hanselmann
    if ie.success:
928 5d97d6dd Michael Hanselmann
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
929 5d97d6dd Michael Hanselmann
    else:
930 c9300bb3 Iustin Pop
      self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
931 5d97d6dd Michael Hanselmann
                       (dtp.data.name, ie.final_message, ie.recent_output))
932 5d97d6dd Michael Hanselmann
933 5d97d6dd Michael Hanselmann
    dtp.RecordResult(ie.success)
934 5d97d6dd Michael Hanselmann
935 5d97d6dd Michael Hanselmann
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
936 5d97d6dd Michael Hanselmann
    # give the daemon a moment to sort things out
937 5d97d6dd Michael Hanselmann
    if dtp.src_export and not ie.success:
938 5d97d6dd Michael Hanselmann
      dtp.src_export.Abort()
939 5d97d6dd Michael Hanselmann
940 5d97d6dd Michael Hanselmann
941 5d97d6dd Michael Hanselmann
class DiskTransfer(object):
942 5d97d6dd Michael Hanselmann
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
943 5d97d6dd Michael Hanselmann
               finished_fn):
944 5d97d6dd Michael Hanselmann
    """Initializes this class.
945 5d97d6dd Michael Hanselmann

946 5d97d6dd Michael Hanselmann
    @type name: string
947 5d97d6dd Michael Hanselmann
    @param name: User-visible name for this transfer (e.g. "disk/0")
948 5d97d6dd Michael Hanselmann
    @param src_io: Source I/O type
949 5d97d6dd Michael Hanselmann
    @param src_ioargs: Source I/O arguments
950 5d97d6dd Michael Hanselmann
    @param dest_io: Destination I/O type
951 5d97d6dd Michael Hanselmann
    @param dest_ioargs: Destination I/O arguments
952 5d97d6dd Michael Hanselmann
    @type finished_fn: callable
953 5d97d6dd Michael Hanselmann
    @param finished_fn: Function called once transfer has finished
954 5d97d6dd Michael Hanselmann

955 5d97d6dd Michael Hanselmann
    """
956 5d97d6dd Michael Hanselmann
    self.name = name
957 5d97d6dd Michael Hanselmann
958 5d97d6dd Michael Hanselmann
    self.src_io = src_io
959 5d97d6dd Michael Hanselmann
    self.src_ioargs = src_ioargs
960 5d97d6dd Michael Hanselmann
961 5d97d6dd Michael Hanselmann
    self.dest_io = dest_io
962 5d97d6dd Michael Hanselmann
    self.dest_ioargs = dest_ioargs
963 5d97d6dd Michael Hanselmann
964 5d97d6dd Michael Hanselmann
    self.finished_fn = finished_fn
965 5d97d6dd Michael Hanselmann
966 5d97d6dd Michael Hanselmann
967 5d97d6dd Michael Hanselmann
class _DiskTransferPrivate(object):
968 d51ae04c Michael Hanselmann
  def __init__(self, data, success, export_opts):
969 5d97d6dd Michael Hanselmann
    """Initializes this class.
970 5d97d6dd Michael Hanselmann

971 5d97d6dd Michael Hanselmann
    @type data: L{DiskTransfer}
972 5d97d6dd Michael Hanselmann
    @type success: bool
973 5d97d6dd Michael Hanselmann

974 5d97d6dd Michael Hanselmann
    """
975 5d97d6dd Michael Hanselmann
    self.data = data
976 d51ae04c Michael Hanselmann
    self.success = success
977 d51ae04c Michael Hanselmann
    self.export_opts = export_opts
978 5d97d6dd Michael Hanselmann
979 5d97d6dd Michael Hanselmann
    self.src_export = None
980 5d97d6dd Michael Hanselmann
    self.dest_import = None
981 5d97d6dd Michael Hanselmann
982 5d97d6dd Michael Hanselmann
  def RecordResult(self, success):
983 5d97d6dd Michael Hanselmann
    """Updates the status.
984 5d97d6dd Michael Hanselmann

985 5d97d6dd Michael Hanselmann
    One failed part will cause the whole transfer to fail.
986 5d97d6dd Michael Hanselmann

987 5d97d6dd Michael Hanselmann
    """
988 5d97d6dd Michael Hanselmann
    self.success = self.success and success
989 5d97d6dd Michael Hanselmann
990 5d97d6dd Michael Hanselmann
991 d51ae04c Michael Hanselmann
def _GetInstDiskMagic(base, instance_name, index):
992 d51ae04c Michael Hanselmann
  """Computes the magic value for a disk export or import.
993 d51ae04c Michael Hanselmann

994 d51ae04c Michael Hanselmann
  @type base: string
995 d51ae04c Michael Hanselmann
  @param base: Random seed value (can be the same for all disks of a transfer)
996 d51ae04c Michael Hanselmann
  @type instance_name: string
997 d51ae04c Michael Hanselmann
  @param instance_name: Name of instance
998 d51ae04c Michael Hanselmann
  @type index: number
999 d51ae04c Michael Hanselmann
  @param index: Disk index
1000 d51ae04c Michael Hanselmann

1001 d51ae04c Michael Hanselmann
  """
1002 d51ae04c Michael Hanselmann
  h = compat.sha1_hash()
1003 d51ae04c Michael Hanselmann
  h.update(str(constants.RIE_VERSION))
1004 d51ae04c Michael Hanselmann
  h.update(base)
1005 d51ae04c Michael Hanselmann
  h.update(instance_name)
1006 d51ae04c Michael Hanselmann
  h.update(str(index))
1007 d51ae04c Michael Hanselmann
  return h.hexdigest()
1008 d51ae04c Michael Hanselmann
1009 d51ae04c Michael Hanselmann
1010 5d97d6dd Michael Hanselmann
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
1011 5d97d6dd Michael Hanselmann
                         instance, all_transfers):
1012 5d97d6dd Michael Hanselmann
  """Transfers an instance's data from one node to another.
1013 5d97d6dd Michael Hanselmann

1014 5d97d6dd Michael Hanselmann
  @param lu: Logical unit instance
1015 5d97d6dd Michael Hanselmann
  @param feedback_fn: Feedback function
1016 5d97d6dd Michael Hanselmann
  @type src_node: string
1017 5d97d6dd Michael Hanselmann
  @param src_node: Source node name
1018 5d97d6dd Michael Hanselmann
  @type dest_node: string
1019 5d97d6dd Michael Hanselmann
  @param dest_node: Destination node name
1020 5d97d6dd Michael Hanselmann
  @type dest_ip: string
1021 5d97d6dd Michael Hanselmann
  @param dest_ip: IP address of destination node
1022 5d97d6dd Michael Hanselmann
  @type instance: L{objects.Instance}
1023 5d97d6dd Michael Hanselmann
  @param instance: Instance object
1024 5d97d6dd Michael Hanselmann
  @type all_transfers: list of L{DiskTransfer} instances
1025 5d97d6dd Michael Hanselmann
  @param all_transfers: List of all disk transfers to be made
1026 5d97d6dd Michael Hanselmann
  @rtype: list
1027 5d97d6dd Michael Hanselmann
  @return: List with a boolean (True=successful, False=failed) for success for
1028 5d97d6dd Michael Hanselmann
           each transfer
1029 5d97d6dd Michael Hanselmann

1030 5d97d6dd Michael Hanselmann
  """
1031 5bb95572 Michael Hanselmann
  # Disable compression for all moves as these are all within the same cluster
1032 5bb95572 Michael Hanselmann
  compress = constants.IEC_NONE
1033 a5310c2a Michael Hanselmann
1034 a5310c2a Michael Hanselmann
  logging.debug("Source node %s, destination node %s, compression '%s'",
1035 a5310c2a Michael Hanselmann
                src_node, dest_node, compress)
1036 a5310c2a Michael Hanselmann
1037 5d97d6dd Michael Hanselmann
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1038 5d97d6dd Michael Hanselmann
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1039 d51ae04c Michael Hanselmann
                                  src_node, None, dest_node, dest_ip)
1040 5d97d6dd Michael Hanselmann
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1041 d51ae04c Michael Hanselmann
                                 src_node, src_cbs, dest_node, dest_ip)
1042 5d97d6dd Michael Hanselmann
1043 5d97d6dd Michael Hanselmann
  all_dtp = []
1044 5d97d6dd Michael Hanselmann
1045 d51ae04c Michael Hanselmann
  base_magic = utils.GenerateSecret(6)
1046 d51ae04c Michael Hanselmann
1047 5d97d6dd Michael Hanselmann
  ieloop = ImportExportLoop(lu)
1048 5d97d6dd Michael Hanselmann
  try:
1049 d51ae04c Michael Hanselmann
    for idx, transfer in enumerate(all_transfers):
1050 5d97d6dd Michael Hanselmann
      if transfer:
1051 5d97d6dd Michael Hanselmann
        feedback_fn("Exporting %s from %s to %s" %
1052 5d97d6dd Michael Hanselmann
                    (transfer.name, src_node, dest_node))
1053 5d97d6dd Michael Hanselmann
1054 d51ae04c Michael Hanselmann
        magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1055 d51ae04c Michael Hanselmann
        opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1056 d51ae04c Michael Hanselmann
                                           compress=compress, magic=magic)
1057 d51ae04c Michael Hanselmann
1058 d51ae04c Michael Hanselmann
        dtp = _DiskTransferPrivate(transfer, True, opts)
1059 5d97d6dd Michael Hanselmann
1060 5e26c4d9 Iustin Pop
        di = DiskImport(lu, dest_node, opts, instance, "disk%d" % idx,
1061 5d97d6dd Michael Hanselmann
                        transfer.dest_io, transfer.dest_ioargs,
1062 5d97d6dd Michael Hanselmann
                        timeouts, dest_cbs, private=dtp)
1063 5d97d6dd Michael Hanselmann
        ieloop.Add(di)
1064 5d97d6dd Michael Hanselmann
1065 5d97d6dd Michael Hanselmann
        dtp.dest_import = di
1066 5d97d6dd Michael Hanselmann
      else:
1067 85b3901b Michael Hanselmann
        dtp = _DiskTransferPrivate(None, False, None)
1068 5d97d6dd Michael Hanselmann
1069 5d97d6dd Michael Hanselmann
      all_dtp.append(dtp)
1070 5d97d6dd Michael Hanselmann
1071 5d97d6dd Michael Hanselmann
    ieloop.Run()
1072 5d97d6dd Michael Hanselmann
  finally:
1073 5d97d6dd Michael Hanselmann
    ieloop.FinalizeAll()
1074 5d97d6dd Michael Hanselmann
1075 5d97d6dd Michael Hanselmann
  assert len(all_dtp) == len(all_transfers)
1076 403f5172 Guido Trotter
  assert compat.all((dtp.src_export is None or
1077 5d97d6dd Michael Hanselmann
                      dtp.src_export.success is not None) and
1078 5d97d6dd Michael Hanselmann
                     (dtp.dest_import is None or
1079 5d97d6dd Michael Hanselmann
                      dtp.dest_import.success is not None)
1080 403f5172 Guido Trotter
                     for dtp in all_dtp), \
1081 5d97d6dd Michael Hanselmann
         "Not all imports/exports are finalized"
1082 5d97d6dd Michael Hanselmann
1083 5d97d6dd Michael Hanselmann
  return [bool(dtp.success) for dtp in all_dtp]
1084 387794f8 Michael Hanselmann
1085 387794f8 Michael Hanselmann
1086 4a96f1d1 Michael Hanselmann
class _RemoteExportCb(ImportExportCbBase):
1087 4a96f1d1 Michael Hanselmann
  def __init__(self, feedback_fn, disk_count):
1088 4a96f1d1 Michael Hanselmann
    """Initializes this class.
1089 4a96f1d1 Michael Hanselmann

1090 4a96f1d1 Michael Hanselmann
    """
1091 4a96f1d1 Michael Hanselmann
    ImportExportCbBase.__init__(self)
1092 4a96f1d1 Michael Hanselmann
    self._feedback_fn = feedback_fn
1093 4a96f1d1 Michael Hanselmann
    self._dresults = [None] * disk_count
1094 4a96f1d1 Michael Hanselmann
1095 4a96f1d1 Michael Hanselmann
  @property
1096 4a96f1d1 Michael Hanselmann
  def disk_results(self):
1097 4a96f1d1 Michael Hanselmann
    """Returns per-disk results.
1098 4a96f1d1 Michael Hanselmann

1099 4a96f1d1 Michael Hanselmann
    """
1100 4a96f1d1 Michael Hanselmann
    return self._dresults
1101 4a96f1d1 Michael Hanselmann
1102 4a96f1d1 Michael Hanselmann
  def ReportConnected(self, ie, private):
1103 4a96f1d1 Michael Hanselmann
    """Called when a connection has been established.
1104 4a96f1d1 Michael Hanselmann

1105 4a96f1d1 Michael Hanselmann
    """
1106 4a96f1d1 Michael Hanselmann
    (idx, _) = private
1107 4a96f1d1 Michael Hanselmann
1108 4a96f1d1 Michael Hanselmann
    self._feedback_fn("Disk %s is now sending data" % idx)
1109 4a96f1d1 Michael Hanselmann
1110 1a2e7fe9 Michael Hanselmann
  def ReportProgress(self, ie, private):
1111 1a2e7fe9 Michael Hanselmann
    """Called when new progress information should be reported.
1112 1a2e7fe9 Michael Hanselmann

1113 1a2e7fe9 Michael Hanselmann
    """
1114 1a2e7fe9 Michael Hanselmann
    (idx, _) = private
1115 1a2e7fe9 Michael Hanselmann
1116 1a2e7fe9 Michael Hanselmann
    progress = ie.progress
1117 1a2e7fe9 Michael Hanselmann
    if not progress:
1118 1a2e7fe9 Michael Hanselmann
      return
1119 1a2e7fe9 Michael Hanselmann
1120 1a2e7fe9 Michael Hanselmann
    self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1121 1a2e7fe9 Michael Hanselmann
1122 4a96f1d1 Michael Hanselmann
  def ReportFinished(self, ie, private):
1123 4a96f1d1 Michael Hanselmann
    """Called when a transfer has finished.
1124 4a96f1d1 Michael Hanselmann

1125 4a96f1d1 Michael Hanselmann
    """
1126 4a96f1d1 Michael Hanselmann
    (idx, finished_fn) = private
1127 4a96f1d1 Michael Hanselmann
1128 4a96f1d1 Michael Hanselmann
    if ie.success:
1129 4a96f1d1 Michael Hanselmann
      self._feedback_fn("Disk %s finished sending data" % idx)
1130 4a96f1d1 Michael Hanselmann
    else:
1131 c9300bb3 Iustin Pop
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1132 4a96f1d1 Michael Hanselmann
                        (idx, ie.final_message, ie.recent_output))
1133 4a96f1d1 Michael Hanselmann
1134 4a96f1d1 Michael Hanselmann
    self._dresults[idx] = bool(ie.success)
1135 4a96f1d1 Michael Hanselmann
1136 4a96f1d1 Michael Hanselmann
    if finished_fn:
1137 4a96f1d1 Michael Hanselmann
      finished_fn()
1138 4a96f1d1 Michael Hanselmann
1139 4a96f1d1 Michael Hanselmann
1140 387794f8 Michael Hanselmann
class ExportInstanceHelper:
1141 387794f8 Michael Hanselmann
  def __init__(self, lu, feedback_fn, instance):
1142 387794f8 Michael Hanselmann
    """Initializes this class.
1143 387794f8 Michael Hanselmann

1144 387794f8 Michael Hanselmann
    @param lu: Logical unit instance
1145 387794f8 Michael Hanselmann
    @param feedback_fn: Feedback function
1146 387794f8 Michael Hanselmann
    @type instance: L{objects.Instance}
1147 387794f8 Michael Hanselmann
    @param instance: Instance object
1148 387794f8 Michael Hanselmann

1149 387794f8 Michael Hanselmann
    """
1150 387794f8 Michael Hanselmann
    self._lu = lu
1151 387794f8 Michael Hanselmann
    self._feedback_fn = feedback_fn
1152 387794f8 Michael Hanselmann
    self._instance = instance
1153 387794f8 Michael Hanselmann
1154 387794f8 Michael Hanselmann
    self._snap_disks = []
1155 387794f8 Michael Hanselmann
    self._removed_snaps = [False] * len(instance.disks)
1156 387794f8 Michael Hanselmann
1157 387794f8 Michael Hanselmann
  def CreateSnapshots(self):
1158 387794f8 Michael Hanselmann
    """Creates an LVM snapshot for every disk of the instance.
1159 387794f8 Michael Hanselmann

1160 387794f8 Michael Hanselmann
    """
1161 387794f8 Michael Hanselmann
    assert not self._snap_disks
1162 387794f8 Michael Hanselmann
1163 387794f8 Michael Hanselmann
    instance = self._instance
1164 387794f8 Michael Hanselmann
    src_node = instance.primary_node
1165 387794f8 Michael Hanselmann
1166 387794f8 Michael Hanselmann
    for idx, disk in enumerate(instance.disks):
1167 387794f8 Michael Hanselmann
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1168 387794f8 Michael Hanselmann
                        (idx, src_node))
1169 387794f8 Michael Hanselmann
1170 387794f8 Michael Hanselmann
      # result.payload will be a snapshot of an lvm leaf of the one we
1171 387794f8 Michael Hanselmann
      # passed
1172 387794f8 Michael Hanselmann
      result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1173 800ac399 Iustin Pop
      new_dev = False
1174 387794f8 Michael Hanselmann
      msg = result.fail_msg
1175 387794f8 Michael Hanselmann
      if msg:
1176 387794f8 Michael Hanselmann
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1177 387794f8 Michael Hanselmann
                            idx, src_node, msg)
1178 800ac399 Iustin Pop
      elif (not isinstance(result.payload, (tuple, list)) or
1179 800ac399 Iustin Pop
            len(result.payload) != 2):
1180 800ac399 Iustin Pop
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1181 800ac399 Iustin Pop
                            " result '%s'", idx, src_node, result.payload)
1182 387794f8 Michael Hanselmann
      else:
1183 800ac399 Iustin Pop
        disk_id = tuple(result.payload)
1184 387794f8 Michael Hanselmann
        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1185 387794f8 Michael Hanselmann
                               logical_id=disk_id, physical_id=disk_id,
1186 387794f8 Michael Hanselmann
                               iv_name=disk.iv_name)
1187 387794f8 Michael Hanselmann
1188 387794f8 Michael Hanselmann
      self._snap_disks.append(new_dev)
1189 387794f8 Michael Hanselmann
1190 387794f8 Michael Hanselmann
    assert len(self._snap_disks) == len(instance.disks)
1191 387794f8 Michael Hanselmann
    assert len(self._removed_snaps) == len(instance.disks)
1192 387794f8 Michael Hanselmann
1193 387794f8 Michael Hanselmann
  def _RemoveSnapshot(self, disk_index):
1194 387794f8 Michael Hanselmann
    """Removes an LVM snapshot.
1195 387794f8 Michael Hanselmann

1196 387794f8 Michael Hanselmann
    @type disk_index: number
1197 387794f8 Michael Hanselmann
    @param disk_index: Index of the snapshot to be removed
1198 387794f8 Michael Hanselmann

1199 387794f8 Michael Hanselmann
    """
1200 387794f8 Michael Hanselmann
    disk = self._snap_disks[disk_index]
1201 387794f8 Michael Hanselmann
    if disk and not self._removed_snaps[disk_index]:
1202 387794f8 Michael Hanselmann
      src_node = self._instance.primary_node
1203 387794f8 Michael Hanselmann
1204 387794f8 Michael Hanselmann
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1205 387794f8 Michael Hanselmann
                        (disk_index, src_node))
1206 387794f8 Michael Hanselmann
1207 387794f8 Michael Hanselmann
      result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1208 387794f8 Michael Hanselmann
      if result.fail_msg:
1209 387794f8 Michael Hanselmann
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1210 387794f8 Michael Hanselmann
                            " %s: %s", disk_index, src_node, result.fail_msg)
1211 387794f8 Michael Hanselmann
      else:
1212 387794f8 Michael Hanselmann
        self._removed_snaps[disk_index] = True
1213 387794f8 Michael Hanselmann
1214 387794f8 Michael Hanselmann
  def LocalExport(self, dest_node):
1215 387794f8 Michael Hanselmann
    """Intra-cluster instance export.
1216 387794f8 Michael Hanselmann

1217 387794f8 Michael Hanselmann
    @type dest_node: L{objects.Node}
1218 387794f8 Michael Hanselmann
    @param dest_node: Destination node
1219 387794f8 Michael Hanselmann

1220 387794f8 Michael Hanselmann
    """
1221 387794f8 Michael Hanselmann
    instance = self._instance
1222 387794f8 Michael Hanselmann
    src_node = instance.primary_node
1223 387794f8 Michael Hanselmann
1224 387794f8 Michael Hanselmann
    assert len(self._snap_disks) == len(instance.disks)
1225 387794f8 Michael Hanselmann
1226 387794f8 Michael Hanselmann
    transfers = []
1227 387794f8 Michael Hanselmann
1228 387794f8 Michael Hanselmann
    for idx, dev in enumerate(self._snap_disks):
1229 387794f8 Michael Hanselmann
      if not dev:
1230 387794f8 Michael Hanselmann
        transfers.append(None)
1231 387794f8 Michael Hanselmann
        continue
1232 387794f8 Michael Hanselmann
1233 387794f8 Michael Hanselmann
      path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1234 387794f8 Michael Hanselmann
                            dev.physical_id[1])
1235 387794f8 Michael Hanselmann
1236 387794f8 Michael Hanselmann
      finished_fn = compat.partial(self._TransferFinished, idx)
1237 387794f8 Michael Hanselmann
1238 387794f8 Michael Hanselmann
      # FIXME: pass debug option from opcode to backend
1239 387794f8 Michael Hanselmann
      dt = DiskTransfer("snapshot/%s" % idx,
1240 387794f8 Michael Hanselmann
                        constants.IEIO_SCRIPT, (dev, idx),
1241 387794f8 Michael Hanselmann
                        constants.IEIO_FILE, (path, ),
1242 387794f8 Michael Hanselmann
                        finished_fn)
1243 387794f8 Michael Hanselmann
      transfers.append(dt)
1244 387794f8 Michael Hanselmann
1245 387794f8 Michael Hanselmann
    # Actually export data
1246 387794f8 Michael Hanselmann
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1247 387794f8 Michael Hanselmann
                                    src_node, dest_node.name,
1248 387794f8 Michael Hanselmann
                                    dest_node.secondary_ip,
1249 387794f8 Michael Hanselmann
                                    instance, transfers)
1250 387794f8 Michael Hanselmann
1251 387794f8 Michael Hanselmann
    assert len(dresults) == len(instance.disks)
1252 387794f8 Michael Hanselmann
1253 387794f8 Michael Hanselmann
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1254 387794f8 Michael Hanselmann
    result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1255 387794f8 Michael Hanselmann
                                               self._snap_disks)
1256 387794f8 Michael Hanselmann
    msg = result.fail_msg
1257 387794f8 Michael Hanselmann
    fin_resu = not msg
1258 387794f8 Michael Hanselmann
    if msg:
1259 387794f8 Michael Hanselmann
      self._lu.LogWarning("Could not finalize export for instance %s"
1260 387794f8 Michael Hanselmann
                          " on node %s: %s", instance.name, dest_node.name, msg)
1261 387794f8 Michael Hanselmann
1262 387794f8 Michael Hanselmann
    return (fin_resu, dresults)
1263 387794f8 Michael Hanselmann
1264 d51ae04c Michael Hanselmann
  def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1265 4a96f1d1 Michael Hanselmann
    """Inter-cluster instance export.
1266 4a96f1d1 Michael Hanselmann

1267 4a96f1d1 Michael Hanselmann
    @type disk_info: list
1268 4a96f1d1 Michael Hanselmann
    @param disk_info: Per-disk destination information
1269 d51ae04c Michael Hanselmann
    @type key_name: string
1270 d51ae04c Michael Hanselmann
    @param key_name: Name of X509 key to use
1271 d51ae04c Michael Hanselmann
    @type dest_ca_pem: string
1272 d51ae04c Michael Hanselmann
    @param dest_ca_pem: Destination X509 CA in PEM format
1273 4a96f1d1 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
1274 4a96f1d1 Michael Hanselmann
    @param timeouts: Timeouts for this import
1275 4a96f1d1 Michael Hanselmann

1276 4a96f1d1 Michael Hanselmann
    """
1277 4a96f1d1 Michael Hanselmann
    instance = self._instance
1278 4a96f1d1 Michael Hanselmann
1279 4a96f1d1 Michael Hanselmann
    assert len(disk_info) == len(instance.disks)
1280 4a96f1d1 Michael Hanselmann
1281 4a96f1d1 Michael Hanselmann
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1282 4a96f1d1 Michael Hanselmann
1283 4a96f1d1 Michael Hanselmann
    ieloop = ImportExportLoop(self._lu)
1284 4a96f1d1 Michael Hanselmann
    try:
1285 d51ae04c Michael Hanselmann
      for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1286 d51ae04c Michael Hanselmann
                                                           disk_info)):
1287 ba5619c2 Michael Hanselmann
        # Decide whether to use IPv6
1288 ba5619c2 Michael Hanselmann
        ipv6 = netutils.IP6Address.IsValid(host)
1289 ba5619c2 Michael Hanselmann
1290 d51ae04c Michael Hanselmann
        opts = objects.ImportExportOptions(key_name=key_name,
1291 d51ae04c Michael Hanselmann
                                           ca_pem=dest_ca_pem,
1292 ba5619c2 Michael Hanselmann
                                           magic=magic, ipv6=ipv6)
1293 d51ae04c Michael Hanselmann
1294 4a96f1d1 Michael Hanselmann
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1295 4a96f1d1 Michael Hanselmann
        finished_fn = compat.partial(self._TransferFinished, idx)
1296 4a96f1d1 Michael Hanselmann
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1297 5e26c4d9 Iustin Pop
                              opts, host, port, instance, "disk%d" % idx,
1298 4a96f1d1 Michael Hanselmann
                              constants.IEIO_SCRIPT, (dev, idx),
1299 4a96f1d1 Michael Hanselmann
                              timeouts, cbs, private=(idx, finished_fn)))
1300 4a96f1d1 Michael Hanselmann
1301 4a96f1d1 Michael Hanselmann
      ieloop.Run()
1302 4a96f1d1 Michael Hanselmann
    finally:
1303 4a96f1d1 Michael Hanselmann
      ieloop.FinalizeAll()
1304 4a96f1d1 Michael Hanselmann
1305 4a96f1d1 Michael Hanselmann
    return (True, cbs.disk_results)
1306 4a96f1d1 Michael Hanselmann
1307 387794f8 Michael Hanselmann
  def _TransferFinished(self, idx):
1308 387794f8 Michael Hanselmann
    """Called once a transfer has finished.
1309 387794f8 Michael Hanselmann

1310 387794f8 Michael Hanselmann
    @type idx: number
1311 387794f8 Michael Hanselmann
    @param idx: Disk index
1312 387794f8 Michael Hanselmann

1313 387794f8 Michael Hanselmann
    """
1314 387794f8 Michael Hanselmann
    logging.debug("Transfer %s finished", idx)
1315 387794f8 Michael Hanselmann
    self._RemoveSnapshot(idx)
1316 387794f8 Michael Hanselmann
1317 387794f8 Michael Hanselmann
  def Cleanup(self):
1318 387794f8 Michael Hanselmann
    """Remove all snapshots.
1319 387794f8 Michael Hanselmann

1320 387794f8 Michael Hanselmann
    """
1321 387794f8 Michael Hanselmann
    assert len(self._removed_snaps) == len(self._instance.disks)
1322 387794f8 Michael Hanselmann
    for idx in range(len(self._instance.disks)):
1323 387794f8 Michael Hanselmann
      self._RemoveSnapshot(idx)
1324 1410fa8d Michael Hanselmann
1325 1410fa8d Michael Hanselmann
1326 9bf56d77 Michael Hanselmann
class _RemoteImportCb(ImportExportCbBase):
1327 9bf56d77 Michael Hanselmann
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1328 9bf56d77 Michael Hanselmann
               external_address):
1329 9bf56d77 Michael Hanselmann
    """Initializes this class.
1330 9bf56d77 Michael Hanselmann

1331 9bf56d77 Michael Hanselmann
    @type cds: string
1332 9bf56d77 Michael Hanselmann
    @param cds: Cluster domain secret
1333 9bf56d77 Michael Hanselmann
    @type x509_cert_pem: string
1334 9bf56d77 Michael Hanselmann
    @param x509_cert_pem: CA used for signing import key
1335 9bf56d77 Michael Hanselmann
    @type disk_count: number
1336 9bf56d77 Michael Hanselmann
    @param disk_count: Number of disks
1337 9bf56d77 Michael Hanselmann
    @type external_address: string
1338 9bf56d77 Michael Hanselmann
    @param external_address: External address of destination node
1339 9bf56d77 Michael Hanselmann

1340 9bf56d77 Michael Hanselmann
    """
1341 9bf56d77 Michael Hanselmann
    ImportExportCbBase.__init__(self)
1342 9bf56d77 Michael Hanselmann
    self._feedback_fn = feedback_fn
1343 9bf56d77 Michael Hanselmann
    self._cds = cds
1344 9bf56d77 Michael Hanselmann
    self._x509_cert_pem = x509_cert_pem
1345 9bf56d77 Michael Hanselmann
    self._disk_count = disk_count
1346 9bf56d77 Michael Hanselmann
    self._external_address = external_address
1347 9bf56d77 Michael Hanselmann
1348 9bf56d77 Michael Hanselmann
    self._dresults = [None] * disk_count
1349 9bf56d77 Michael Hanselmann
    self._daemon_port = [None] * disk_count
1350 9bf56d77 Michael Hanselmann
1351 9bf56d77 Michael Hanselmann
    self._salt = utils.GenerateSecret(8)
1352 9bf56d77 Michael Hanselmann
1353 9bf56d77 Michael Hanselmann
  @property
1354 9bf56d77 Michael Hanselmann
  def disk_results(self):
1355 9bf56d77 Michael Hanselmann
    """Returns per-disk results.
1356 9bf56d77 Michael Hanselmann

1357 9bf56d77 Michael Hanselmann
    """
1358 9bf56d77 Michael Hanselmann
    return self._dresults
1359 9bf56d77 Michael Hanselmann
1360 9bf56d77 Michael Hanselmann
  def _CheckAllListening(self):
1361 9bf56d77 Michael Hanselmann
    """Checks whether all daemons are listening.
1362 9bf56d77 Michael Hanselmann

1363 9bf56d77 Michael Hanselmann
    If all daemons are listening, the information is sent to the client.
1364 9bf56d77 Michael Hanselmann

1365 9bf56d77 Michael Hanselmann
    """
1366 9bf56d77 Michael Hanselmann
    if not compat.all(dp is not None for dp in self._daemon_port):
1367 9bf56d77 Michael Hanselmann
      return
1368 9bf56d77 Michael Hanselmann
1369 9bf56d77 Michael Hanselmann
    host = self._external_address
1370 9bf56d77 Michael Hanselmann
1371 9bf56d77 Michael Hanselmann
    disks = []
1372 d51ae04c Michael Hanselmann
    for idx, (port, magic) in enumerate(self._daemon_port):
1373 9bf56d77 Michael Hanselmann
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1374 d51ae04c Michael Hanselmann
                                               idx, host, port, magic))
1375 9bf56d77 Michael Hanselmann
1376 9bf56d77 Michael Hanselmann
    assert len(disks) == self._disk_count
1377 9bf56d77 Michael Hanselmann
1378 9bf56d77 Michael Hanselmann
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1379 9bf56d77 Michael Hanselmann
      "disks": disks,
1380 9bf56d77 Michael Hanselmann
      "x509_ca": self._x509_cert_pem,
1381 9bf56d77 Michael Hanselmann
      })
1382 9bf56d77 Michael Hanselmann
1383 5e26c4d9 Iustin Pop
  def ReportListening(self, ie, private, _):
1384 9bf56d77 Michael Hanselmann
    """Called when daemon started listening.
1385 9bf56d77 Michael Hanselmann

1386 9bf56d77 Michael Hanselmann
    """
1387 9bf56d77 Michael Hanselmann
    (idx, ) = private
1388 9bf56d77 Michael Hanselmann
1389 9bf56d77 Michael Hanselmann
    self._feedback_fn("Disk %s is now listening" % idx)
1390 9bf56d77 Michael Hanselmann
1391 9bf56d77 Michael Hanselmann
    assert self._daemon_port[idx] is None
1392 9bf56d77 Michael Hanselmann
1393 d51ae04c Michael Hanselmann
    self._daemon_port[idx] = (ie.listen_port, ie.magic)
1394 9bf56d77 Michael Hanselmann
1395 9bf56d77 Michael Hanselmann
    self._CheckAllListening()
1396 9bf56d77 Michael Hanselmann
1397 9bf56d77 Michael Hanselmann
  def ReportConnected(self, ie, private):
1398 9bf56d77 Michael Hanselmann
    """Called when a connection has been established.
1399 9bf56d77 Michael Hanselmann

1400 9bf56d77 Michael Hanselmann
    """
1401 9bf56d77 Michael Hanselmann
    (idx, ) = private
1402 9bf56d77 Michael Hanselmann
1403 9bf56d77 Michael Hanselmann
    self._feedback_fn("Disk %s is now receiving data" % idx)
1404 9bf56d77 Michael Hanselmann
1405 9bf56d77 Michael Hanselmann
  def ReportFinished(self, ie, private):
1406 9bf56d77 Michael Hanselmann
    """Called when a transfer has finished.
1407 9bf56d77 Michael Hanselmann

1408 9bf56d77 Michael Hanselmann
    """
1409 9bf56d77 Michael Hanselmann
    (idx, ) = private
1410 9bf56d77 Michael Hanselmann
1411 9bf56d77 Michael Hanselmann
    # Daemon is certainly no longer listening
1412 9bf56d77 Michael Hanselmann
    self._daemon_port[idx] = None
1413 9bf56d77 Michael Hanselmann
1414 9bf56d77 Michael Hanselmann
    if ie.success:
1415 9bf56d77 Michael Hanselmann
      self._feedback_fn("Disk %s finished receiving data" % idx)
1416 9bf56d77 Michael Hanselmann
    else:
1417 9bf56d77 Michael Hanselmann
      self._feedback_fn(("Disk %s failed to receive data: %s"
1418 c9300bb3 Iustin Pop
                         " (recent output: %s)") %
1419 9bf56d77 Michael Hanselmann
                        (idx, ie.final_message, ie.recent_output))
1420 9bf56d77 Michael Hanselmann
1421 9bf56d77 Michael Hanselmann
    self._dresults[idx] = bool(ie.success)
1422 9bf56d77 Michael Hanselmann
1423 9bf56d77 Michael Hanselmann
1424 ba5619c2 Michael Hanselmann
def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1425 ba5619c2 Michael Hanselmann
                 cds, timeouts):
1426 9bf56d77 Michael Hanselmann
  """Imports an instance from another cluster.
1427 9bf56d77 Michael Hanselmann

1428 9bf56d77 Michael Hanselmann
  @param lu: Logical unit instance
1429 9bf56d77 Michael Hanselmann
  @param feedback_fn: Feedback function
1430 9bf56d77 Michael Hanselmann
  @type instance: L{objects.Instance}
1431 9bf56d77 Michael Hanselmann
  @param instance: Instance object
1432 ba5619c2 Michael Hanselmann
  @type pnode: L{objects.Node}
1433 ba5619c2 Michael Hanselmann
  @param pnode: Primary node of instance as an object
1434 9bf56d77 Michael Hanselmann
  @type source_x509_ca: OpenSSL.crypto.X509
1435 9bf56d77 Michael Hanselmann
  @param source_x509_ca: Import source's X509 CA
1436 9bf56d77 Michael Hanselmann
  @type cds: string
1437 9bf56d77 Michael Hanselmann
  @param cds: Cluster domain secret
1438 9bf56d77 Michael Hanselmann
  @type timeouts: L{ImportExportTimeouts}
1439 9bf56d77 Michael Hanselmann
  @param timeouts: Timeouts for this import
1440 9bf56d77 Michael Hanselmann

1441 9bf56d77 Michael Hanselmann
  """
1442 9bf56d77 Michael Hanselmann
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1443 9bf56d77 Michael Hanselmann
                                                  source_x509_ca)
1444 9bf56d77 Michael Hanselmann
1445 d51ae04c Michael Hanselmann
  magic_base = utils.GenerateSecret(6)
1446 d51ae04c Michael Hanselmann
1447 ba5619c2 Michael Hanselmann
  # Decide whether to use IPv6
1448 ba5619c2 Michael Hanselmann
  ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1449 ba5619c2 Michael Hanselmann
1450 9bf56d77 Michael Hanselmann
  # Create crypto key
1451 9bf56d77 Michael Hanselmann
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1452 9bf56d77 Michael Hanselmann
                                        constants.RIE_CERT_VALIDITY)
1453 9bf56d77 Michael Hanselmann
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1454 9bf56d77 Michael Hanselmann
1455 9bf56d77 Michael Hanselmann
  (x509_key_name, x509_cert_pem) = result.payload
1456 9bf56d77 Michael Hanselmann
  try:
1457 9bf56d77 Michael Hanselmann
    # Load certificate
1458 9bf56d77 Michael Hanselmann
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1459 9bf56d77 Michael Hanselmann
                                                x509_cert_pem)
1460 9bf56d77 Michael Hanselmann
1461 9bf56d77 Michael Hanselmann
    # Sign certificate
1462 9bf56d77 Michael Hanselmann
    signed_x509_cert_pem = \
1463 9bf56d77 Michael Hanselmann
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1464 9bf56d77 Michael Hanselmann
1465 9bf56d77 Michael Hanselmann
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1466 ba5619c2 Michael Hanselmann
                          len(instance.disks), pnode.primary_ip)
1467 9bf56d77 Michael Hanselmann
1468 9bf56d77 Michael Hanselmann
    ieloop = ImportExportLoop(lu)
1469 9bf56d77 Michael Hanselmann
    try:
1470 9bf56d77 Michael Hanselmann
      for idx, dev in enumerate(instance.disks):
1471 d51ae04c Michael Hanselmann
        magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1472 d51ae04c Michael Hanselmann
1473 d51ae04c Michael Hanselmann
        # Import daemon options
1474 d51ae04c Michael Hanselmann
        opts = objects.ImportExportOptions(key_name=x509_key_name,
1475 d51ae04c Michael Hanselmann
                                           ca_pem=source_ca_pem,
1476 ba5619c2 Michael Hanselmann
                                           magic=magic, ipv6=ipv6)
1477 d51ae04c Michael Hanselmann
1478 eb630f50 Michael Hanselmann
        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1479 5e26c4d9 Iustin Pop
                              "disk%d" % idx,
1480 9bf56d77 Michael Hanselmann
                              constants.IEIO_SCRIPT, (dev, idx),
1481 9bf56d77 Michael Hanselmann
                              timeouts, cbs, private=(idx, )))
1482 9bf56d77 Michael Hanselmann
1483 9bf56d77 Michael Hanselmann
      ieloop.Run()
1484 9bf56d77 Michael Hanselmann
    finally:
1485 9bf56d77 Michael Hanselmann
      ieloop.FinalizeAll()
1486 9bf56d77 Michael Hanselmann
  finally:
1487 9bf56d77 Michael Hanselmann
    # Remove crypto key and certificate
1488 9bf56d77 Michael Hanselmann
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1489 9bf56d77 Michael Hanselmann
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1490 9bf56d77 Michael Hanselmann
1491 9bf56d77 Michael Hanselmann
  return cbs.disk_results
1492 9bf56d77 Michael Hanselmann
1493 9bf56d77 Michael Hanselmann
1494 1410fa8d Michael Hanselmann
def _GetImportExportHandshakeMessage(version):
1495 1410fa8d Michael Hanselmann
  """Returns the handshake message for a RIE protocol version.
1496 1410fa8d Michael Hanselmann

1497 1410fa8d Michael Hanselmann
  @type version: number
1498 1410fa8d Michael Hanselmann

1499 1410fa8d Michael Hanselmann
  """
1500 1410fa8d Michael Hanselmann
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1501 1410fa8d Michael Hanselmann
1502 1410fa8d Michael Hanselmann
1503 1410fa8d Michael Hanselmann
def ComputeRemoteExportHandshake(cds):
1504 1410fa8d Michael Hanselmann
  """Computes the remote import/export handshake.
1505 1410fa8d Michael Hanselmann

1506 1410fa8d Michael Hanselmann
  @type cds: string
1507 1410fa8d Michael Hanselmann
  @param cds: Cluster domain secret
1508 1410fa8d Michael Hanselmann

1509 1410fa8d Michael Hanselmann
  """
1510 1410fa8d Michael Hanselmann
  salt = utils.GenerateSecret(8)
1511 1410fa8d Michael Hanselmann
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1512 1410fa8d Michael Hanselmann
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1513 1410fa8d Michael Hanselmann
1514 1410fa8d Michael Hanselmann
1515 1410fa8d Michael Hanselmann
def CheckRemoteExportHandshake(cds, handshake):
1516 1410fa8d Michael Hanselmann
  """Checks the handshake of a remote import/export.
1517 1410fa8d Michael Hanselmann

1518 1410fa8d Michael Hanselmann
  @type cds: string
1519 1410fa8d Michael Hanselmann
  @param cds: Cluster domain secret
1520 1410fa8d Michael Hanselmann
  @type handshake: sequence
1521 1410fa8d Michael Hanselmann
  @param handshake: Handshake sent by remote peer
1522 1410fa8d Michael Hanselmann

1523 1410fa8d Michael Hanselmann
  """
1524 1410fa8d Michael Hanselmann
  try:
1525 1410fa8d Michael Hanselmann
    (version, hmac_digest, hmac_salt) = handshake
1526 1410fa8d Michael Hanselmann
  except (TypeError, ValueError), err:
1527 1410fa8d Michael Hanselmann
    return "Invalid data: %s" % err
1528 1410fa8d Michael Hanselmann
1529 1410fa8d Michael Hanselmann
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1530 1410fa8d Michael Hanselmann
                              hmac_digest, salt=hmac_salt):
1531 1410fa8d Michael Hanselmann
    return "Hash didn't match, clusters don't share the same domain secret"
1532 1410fa8d Michael Hanselmann
1533 1410fa8d Michael Hanselmann
  if version != constants.RIE_VERSION:
1534 1410fa8d Michael Hanselmann
    return ("Clusters don't have the same remote import/export protocol"
1535 1410fa8d Michael Hanselmann
            " (local=%s, remote=%s)" %
1536 1410fa8d Michael Hanselmann
            (constants.RIE_VERSION, version))
1537 1410fa8d Michael Hanselmann
1538 1410fa8d Michael Hanselmann
  return None
1539 4a96f1d1 Michael Hanselmann
1540 4a96f1d1 Michael Hanselmann
1541 d51ae04c Michael Hanselmann
def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1542 4a96f1d1 Michael Hanselmann
  """Returns the hashed text for import/export disk information.
1543 4a96f1d1 Michael Hanselmann

1544 4a96f1d1 Michael Hanselmann
  @type disk_index: number
1545 4a96f1d1 Michael Hanselmann
  @param disk_index: Index of disk (included in hash)
1546 4a96f1d1 Michael Hanselmann
  @type host: string
1547 4a96f1d1 Michael Hanselmann
  @param host: Hostname
1548 4a96f1d1 Michael Hanselmann
  @type port: number
1549 4a96f1d1 Michael Hanselmann
  @param port: Daemon port
1550 d51ae04c Michael Hanselmann
  @type magic: string
1551 d51ae04c Michael Hanselmann
  @param magic: Magic value
1552 4a96f1d1 Michael Hanselmann

1553 4a96f1d1 Michael Hanselmann
  """
1554 d51ae04c Michael Hanselmann
  return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1555 4a96f1d1 Michael Hanselmann
1556 4a96f1d1 Michael Hanselmann
1557 4a96f1d1 Michael Hanselmann
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1558 4a96f1d1 Michael Hanselmann
  """Verifies received disk information for an export.
1559 4a96f1d1 Michael Hanselmann

1560 4a96f1d1 Michael Hanselmann
  @type cds: string
1561 4a96f1d1 Michael Hanselmann
  @param cds: Cluster domain secret
1562 4a96f1d1 Michael Hanselmann
  @type disk_index: number
1563 4a96f1d1 Michael Hanselmann
  @param disk_index: Index of disk (included in hash)
1564 4a96f1d1 Michael Hanselmann
  @type disk_info: sequence
1565 4a96f1d1 Michael Hanselmann
  @param disk_info: Disk information sent by remote peer
1566 4a96f1d1 Michael Hanselmann

1567 4a96f1d1 Michael Hanselmann
  """
1568 4a96f1d1 Michael Hanselmann
  try:
1569 d51ae04c Michael Hanselmann
    (host, port, magic, hmac_digest, hmac_salt) = disk_info
1570 4a96f1d1 Michael Hanselmann
  except (TypeError, ValueError), err:
1571 4a96f1d1 Michael Hanselmann
    raise errors.GenericError("Invalid data: %s" % err)
1572 4a96f1d1 Michael Hanselmann
1573 d51ae04c Michael Hanselmann
  if not (host and port and magic):
1574 d51ae04c Michael Hanselmann
    raise errors.GenericError("Missing destination host, port or magic")
1575 4a96f1d1 Michael Hanselmann
1576 d51ae04c Michael Hanselmann
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1577 4a96f1d1 Michael Hanselmann
1578 4a96f1d1 Michael Hanselmann
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1579 4a96f1d1 Michael Hanselmann
    raise errors.GenericError("HMAC is wrong")
1580 4a96f1d1 Michael Hanselmann
1581 ba5619c2 Michael Hanselmann
  if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1582 ba5619c2 Michael Hanselmann
    destination = host
1583 ba5619c2 Michael Hanselmann
  else:
1584 ba5619c2 Michael Hanselmann
    destination = netutils.Hostname.GetNormalizedName(host)
1585 ba5619c2 Michael Hanselmann
1586 ba5619c2 Michael Hanselmann
  return (destination,
1587 d51ae04c Michael Hanselmann
          utils.ValidateServiceName(port),
1588 d51ae04c Michael Hanselmann
          magic)
1589 4a96f1d1 Michael Hanselmann
1590 4a96f1d1 Michael Hanselmann
1591 d51ae04c Michael Hanselmann
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1592 4a96f1d1 Michael Hanselmann
  """Computes the signed disk information for a remote import.
1593 4a96f1d1 Michael Hanselmann

1594 4a96f1d1 Michael Hanselmann
  @type cds: string
1595 4a96f1d1 Michael Hanselmann
  @param cds: Cluster domain secret
1596 4a96f1d1 Michael Hanselmann
  @type salt: string
1597 4a96f1d1 Michael Hanselmann
  @param salt: HMAC salt
1598 4a96f1d1 Michael Hanselmann
  @type disk_index: number
1599 4a96f1d1 Michael Hanselmann
  @param disk_index: Index of disk (included in hash)
1600 4a96f1d1 Michael Hanselmann
  @type host: string
1601 4a96f1d1 Michael Hanselmann
  @param host: Hostname
1602 4a96f1d1 Michael Hanselmann
  @type port: number
1603 4a96f1d1 Michael Hanselmann
  @param port: Daemon port
1604 d51ae04c Michael Hanselmann
  @type magic: string
1605 d51ae04c Michael Hanselmann
  @param magic: Magic value
1606 4a96f1d1 Michael Hanselmann

1607 4a96f1d1 Michael Hanselmann
  """
1608 d51ae04c Michael Hanselmann
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1609 4a96f1d1 Michael Hanselmann
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1610 d51ae04c Michael Hanselmann
  return (host, port, magic, hmac_digest, salt)