Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ 62bfbc7d

History | View | Annotate | Download (44.8 kB)

1 033a1d00 Michael Hanselmann
#
2 033a1d00 Michael Hanselmann
#
3 033a1d00 Michael Hanselmann
4 c9300bb3 Iustin Pop
# Copyright (C) 2010, 2011 Google Inc.
5 033a1d00 Michael Hanselmann
#
6 033a1d00 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 033a1d00 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 033a1d00 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 033a1d00 Michael Hanselmann
# (at your option) any later version.
10 033a1d00 Michael Hanselmann
#
11 033a1d00 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 033a1d00 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 033a1d00 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 033a1d00 Michael Hanselmann
# General Public License for more details.
15 033a1d00 Michael Hanselmann
#
16 033a1d00 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 033a1d00 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 033a1d00 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 033a1d00 Michael Hanselmann
# 02110-1301, USA.
20 033a1d00 Michael Hanselmann
21 033a1d00 Michael Hanselmann
22 033a1d00 Michael Hanselmann
"""Instance-related functions and classes for masterd.
23 033a1d00 Michael Hanselmann

24 033a1d00 Michael Hanselmann
"""
25 033a1d00 Michael Hanselmann
26 033a1d00 Michael Hanselmann
import logging
27 033a1d00 Michael Hanselmann
import time
28 4a96f1d1 Michael Hanselmann
import OpenSSL
29 033a1d00 Michael Hanselmann
30 033a1d00 Michael Hanselmann
from ganeti import constants
31 033a1d00 Michael Hanselmann
from ganeti import errors
32 033a1d00 Michael Hanselmann
from ganeti import compat
33 387794f8 Michael Hanselmann
from ganeti import utils
34 387794f8 Michael Hanselmann
from ganeti import objects
35 a744b676 Manuel Franceschini
from ganeti import netutils
36 033a1d00 Michael Hanselmann
37 033a1d00 Michael Hanselmann
38 033a1d00 Michael Hanselmann
class _ImportExportError(Exception):
39 033a1d00 Michael Hanselmann
  """Local exception to report import/export errors.
40 033a1d00 Michael Hanselmann

41 033a1d00 Michael Hanselmann
  """
42 033a1d00 Michael Hanselmann
43 033a1d00 Michael Hanselmann
44 033a1d00 Michael Hanselmann
class ImportExportTimeouts(object):
45 033a1d00 Michael Hanselmann
  #: Time until daemon starts writing status file
46 033a1d00 Michael Hanselmann
  DEFAULT_READY_TIMEOUT = 10
47 033a1d00 Michael Hanselmann
48 033a1d00 Michael Hanselmann
  #: Length of time until errors cause hard failure
49 033a1d00 Michael Hanselmann
  DEFAULT_ERROR_TIMEOUT = 10
50 033a1d00 Michael Hanselmann
51 033a1d00 Michael Hanselmann
  #: Time after which daemon must be listening
52 033a1d00 Michael Hanselmann
  DEFAULT_LISTEN_TIMEOUT = 10
53 033a1d00 Michael Hanselmann
54 1a2e7fe9 Michael Hanselmann
  #: Progress update interval
55 1a2e7fe9 Michael Hanselmann
  DEFAULT_PROGRESS_INTERVAL = 60
56 1a2e7fe9 Michael Hanselmann
57 033a1d00 Michael Hanselmann
  __slots__ = [
58 033a1d00 Michael Hanselmann
    "error",
59 033a1d00 Michael Hanselmann
    "ready",
60 033a1d00 Michael Hanselmann
    "listen",
61 033a1d00 Michael Hanselmann
    "connect",
62 1a2e7fe9 Michael Hanselmann
    "progress",
63 033a1d00 Michael Hanselmann
    ]
64 033a1d00 Michael Hanselmann
65 033a1d00 Michael Hanselmann
  def __init__(self, connect,
66 033a1d00 Michael Hanselmann
               listen=DEFAULT_LISTEN_TIMEOUT,
67 033a1d00 Michael Hanselmann
               error=DEFAULT_ERROR_TIMEOUT,
68 1a2e7fe9 Michael Hanselmann
               ready=DEFAULT_READY_TIMEOUT,
69 1a2e7fe9 Michael Hanselmann
               progress=DEFAULT_PROGRESS_INTERVAL):
70 033a1d00 Michael Hanselmann
    """Initializes this class.
71 033a1d00 Michael Hanselmann

72 033a1d00 Michael Hanselmann
    @type connect: number
73 033a1d00 Michael Hanselmann
    @param connect: Timeout for establishing connection
74 033a1d00 Michael Hanselmann
    @type listen: number
75 033a1d00 Michael Hanselmann
    @param listen: Timeout for starting to listen for connections
76 033a1d00 Michael Hanselmann
    @type error: number
77 033a1d00 Michael Hanselmann
    @param error: Length of time until errors cause hard failure
78 033a1d00 Michael Hanselmann
    @type ready: number
79 033a1d00 Michael Hanselmann
    @param ready: Timeout for daemon to become ready
80 1a2e7fe9 Michael Hanselmann
    @type progress: number
81 1a2e7fe9 Michael Hanselmann
    @param progress: Progress update interval
82 033a1d00 Michael Hanselmann

83 033a1d00 Michael Hanselmann
    """
84 033a1d00 Michael Hanselmann
    self.error = error
85 033a1d00 Michael Hanselmann
    self.ready = ready
86 033a1d00 Michael Hanselmann
    self.listen = listen
87 033a1d00 Michael Hanselmann
    self.connect = connect
88 1a2e7fe9 Michael Hanselmann
    self.progress = progress
89 033a1d00 Michael Hanselmann
90 033a1d00 Michael Hanselmann
91 033a1d00 Michael Hanselmann
class ImportExportCbBase(object):
92 033a1d00 Michael Hanselmann
  """Callbacks for disk import/export.
93 033a1d00 Michael Hanselmann

94 033a1d00 Michael Hanselmann
  """
95 5e26c4d9 Iustin Pop
  def ReportListening(self, ie, private, component):
96 033a1d00 Michael Hanselmann
    """Called when daemon started listening.
97 033a1d00 Michael Hanselmann

98 033a1d00 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
99 033a1d00 Michael Hanselmann
    @param ie: Import/export object
100 033a1d00 Michael Hanselmann
    @param private: Private data passed to import/export object
101 5e26c4d9 Iustin Pop
    @param component: transfer component name
102 033a1d00 Michael Hanselmann

103 033a1d00 Michael Hanselmann
    """
104 033a1d00 Michael Hanselmann
105 033a1d00 Michael Hanselmann
  def ReportConnected(self, ie, private):
106 033a1d00 Michael Hanselmann
    """Called when a connection has been established.
107 033a1d00 Michael Hanselmann

108 033a1d00 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
109 033a1d00 Michael Hanselmann
    @param ie: Import/export object
110 033a1d00 Michael Hanselmann
    @param private: Private data passed to import/export object
111 033a1d00 Michael Hanselmann

112 033a1d00 Michael Hanselmann
    """
113 033a1d00 Michael Hanselmann
114 1a2e7fe9 Michael Hanselmann
  def ReportProgress(self, ie, private):
115 1a2e7fe9 Michael Hanselmann
    """Called when new progress information should be reported.
116 1a2e7fe9 Michael Hanselmann

117 1a2e7fe9 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
118 1a2e7fe9 Michael Hanselmann
    @param ie: Import/export object
119 1a2e7fe9 Michael Hanselmann
    @param private: Private data passed to import/export object
120 1a2e7fe9 Michael Hanselmann

121 1a2e7fe9 Michael Hanselmann
    """
122 1a2e7fe9 Michael Hanselmann
123 033a1d00 Michael Hanselmann
  def ReportFinished(self, ie, private):
124 033a1d00 Michael Hanselmann
    """Called when a transfer has finished.
125 033a1d00 Michael Hanselmann

126 033a1d00 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
127 033a1d00 Michael Hanselmann
    @param ie: Import/export object
128 033a1d00 Michael Hanselmann
    @param private: Private data passed to import/export object
129 033a1d00 Michael Hanselmann

130 033a1d00 Michael Hanselmann
    """
131 033a1d00 Michael Hanselmann
132 033a1d00 Michael Hanselmann
133 033a1d00 Michael Hanselmann
class _DiskImportExportBase(object):
134 033a1d00 Michael Hanselmann
  MODE_TEXT = None
135 033a1d00 Michael Hanselmann
136 eb630f50 Michael Hanselmann
  def __init__(self, lu, node_name, opts,
137 5e26c4d9 Iustin Pop
               instance, component, timeouts, cbs, private=None):
138 033a1d00 Michael Hanselmann
    """Initializes this class.
139 033a1d00 Michael Hanselmann

140 033a1d00 Michael Hanselmann
    @param lu: Logical unit instance
141 033a1d00 Michael Hanselmann
    @type node_name: string
142 033a1d00 Michael Hanselmann
    @param node_name: Node name for import
143 eb630f50 Michael Hanselmann
    @type opts: L{objects.ImportExportOptions}
144 eb630f50 Michael Hanselmann
    @param opts: Import/export daemon options
145 033a1d00 Michael Hanselmann
    @type instance: L{objects.Instance}
146 033a1d00 Michael Hanselmann
    @param instance: Instance object
147 5e26c4d9 Iustin Pop
    @type component: string
148 5e26c4d9 Iustin Pop
    @param component: which part of the instance is being imported
149 033a1d00 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
150 033a1d00 Michael Hanselmann
    @param timeouts: Timeouts for this import
151 033a1d00 Michael Hanselmann
    @type cbs: L{ImportExportCbBase}
152 033a1d00 Michael Hanselmann
    @param cbs: Callbacks
153 033a1d00 Michael Hanselmann
    @param private: Private data for callback functions
154 033a1d00 Michael Hanselmann

155 033a1d00 Michael Hanselmann
    """
156 033a1d00 Michael Hanselmann
    assert self.MODE_TEXT
157 033a1d00 Michael Hanselmann
158 033a1d00 Michael Hanselmann
    self._lu = lu
159 033a1d00 Michael Hanselmann
    self.node_name = node_name
160 4478301b Michael Hanselmann
    self._opts = opts.Copy()
161 033a1d00 Michael Hanselmann
    self._instance = instance
162 5e26c4d9 Iustin Pop
    self._component = component
163 033a1d00 Michael Hanselmann
    self._timeouts = timeouts
164 033a1d00 Michael Hanselmann
    self._cbs = cbs
165 033a1d00 Michael Hanselmann
    self._private = private
166 033a1d00 Michael Hanselmann
167 4478301b Michael Hanselmann
    # Set master daemon's timeout in options for import/export daemon
168 4478301b Michael Hanselmann
    assert self._opts.connect_timeout is None
169 4478301b Michael Hanselmann
    self._opts.connect_timeout = timeouts.connect
170 4478301b Michael Hanselmann
171 033a1d00 Michael Hanselmann
    # Parent loop
172 033a1d00 Michael Hanselmann
    self._loop = None
173 033a1d00 Michael Hanselmann
174 033a1d00 Michael Hanselmann
    # Timestamps
175 033a1d00 Michael Hanselmann
    self._ts_begin = None
176 033a1d00 Michael Hanselmann
    self._ts_connected = None
177 033a1d00 Michael Hanselmann
    self._ts_finished = None
178 033a1d00 Michael Hanselmann
    self._ts_cleanup = None
179 1a2e7fe9 Michael Hanselmann
    self._ts_last_progress = None
180 033a1d00 Michael Hanselmann
    self._ts_last_error = None
181 033a1d00 Michael Hanselmann
182 033a1d00 Michael Hanselmann
    # Transfer status
183 033a1d00 Michael Hanselmann
    self.success = None
184 033a1d00 Michael Hanselmann
    self.final_message = None
185 033a1d00 Michael Hanselmann
186 033a1d00 Michael Hanselmann
    # Daemon status
187 033a1d00 Michael Hanselmann
    self._daemon_name = None
188 033a1d00 Michael Hanselmann
    self._daemon = None
189 033a1d00 Michael Hanselmann
190 033a1d00 Michael Hanselmann
  @property
191 033a1d00 Michael Hanselmann
  def recent_output(self):
192 033a1d00 Michael Hanselmann
    """Returns the most recent output from the daemon.
193 033a1d00 Michael Hanselmann

194 033a1d00 Michael Hanselmann
    """
195 033a1d00 Michael Hanselmann
    if self._daemon:
196 c9300bb3 Iustin Pop
      return "\n".join(self._daemon.recent_output)
197 033a1d00 Michael Hanselmann
198 033a1d00 Michael Hanselmann
    return None
199 033a1d00 Michael Hanselmann
200 033a1d00 Michael Hanselmann
  @property
201 1a2e7fe9 Michael Hanselmann
  def progress(self):
202 1a2e7fe9 Michael Hanselmann
    """Returns transfer progress information.
203 1a2e7fe9 Michael Hanselmann

204 1a2e7fe9 Michael Hanselmann
    """
205 1a2e7fe9 Michael Hanselmann
    if not self._daemon:
206 1a2e7fe9 Michael Hanselmann
      return None
207 1a2e7fe9 Michael Hanselmann
208 1a2e7fe9 Michael Hanselmann
    return (self._daemon.progress_mbytes,
209 1a2e7fe9 Michael Hanselmann
            self._daemon.progress_throughput,
210 1a2e7fe9 Michael Hanselmann
            self._daemon.progress_percent,
211 1a2e7fe9 Michael Hanselmann
            self._daemon.progress_eta)
212 1a2e7fe9 Michael Hanselmann
213 1a2e7fe9 Michael Hanselmann
  @property
214 af1d39b1 Michael Hanselmann
  def magic(self):
215 af1d39b1 Michael Hanselmann
    """Returns the magic value for this import/export.
216 af1d39b1 Michael Hanselmann

217 af1d39b1 Michael Hanselmann
    """
218 af1d39b1 Michael Hanselmann
    return self._opts.magic
219 af1d39b1 Michael Hanselmann
220 af1d39b1 Michael Hanselmann
  @property
221 033a1d00 Michael Hanselmann
  def active(self):
222 033a1d00 Michael Hanselmann
    """Determines whether this transport is still active.
223 033a1d00 Michael Hanselmann

224 033a1d00 Michael Hanselmann
    """
225 033a1d00 Michael Hanselmann
    return self.success is None
226 033a1d00 Michael Hanselmann
227 033a1d00 Michael Hanselmann
  @property
228 033a1d00 Michael Hanselmann
  def loop(self):
229 033a1d00 Michael Hanselmann
    """Returns parent loop.
230 033a1d00 Michael Hanselmann

231 033a1d00 Michael Hanselmann
    @rtype: L{ImportExportLoop}
232 033a1d00 Michael Hanselmann

233 033a1d00 Michael Hanselmann
    """
234 033a1d00 Michael Hanselmann
    return self._loop
235 033a1d00 Michael Hanselmann
236 033a1d00 Michael Hanselmann
  def SetLoop(self, loop):
237 033a1d00 Michael Hanselmann
    """Sets the parent loop.
238 033a1d00 Michael Hanselmann

239 033a1d00 Michael Hanselmann
    @type loop: L{ImportExportLoop}
240 033a1d00 Michael Hanselmann

241 033a1d00 Michael Hanselmann
    """
242 033a1d00 Michael Hanselmann
    if self._loop:
243 033a1d00 Michael Hanselmann
      raise errors.ProgrammerError("Loop can only be set once")
244 033a1d00 Michael Hanselmann
245 033a1d00 Michael Hanselmann
    self._loop = loop
246 033a1d00 Michael Hanselmann
247 033a1d00 Michael Hanselmann
  def _StartDaemon(self):
248 033a1d00 Michael Hanselmann
    """Starts the import/export daemon.
249 033a1d00 Michael Hanselmann

250 033a1d00 Michael Hanselmann
    """
251 033a1d00 Michael Hanselmann
    raise NotImplementedError()
252 033a1d00 Michael Hanselmann
253 033a1d00 Michael Hanselmann
  def CheckDaemon(self):
254 033a1d00 Michael Hanselmann
    """Checks whether daemon has been started and if not, starts it.
255 033a1d00 Michael Hanselmann

256 033a1d00 Michael Hanselmann
    @rtype: string
257 033a1d00 Michael Hanselmann
    @return: Daemon name
258 033a1d00 Michael Hanselmann

259 033a1d00 Michael Hanselmann
    """
260 033a1d00 Michael Hanselmann
    assert self._ts_cleanup is None
261 033a1d00 Michael Hanselmann
262 033a1d00 Michael Hanselmann
    if self._daemon_name is None:
263 033a1d00 Michael Hanselmann
      assert self._ts_begin is None
264 033a1d00 Michael Hanselmann
265 033a1d00 Michael Hanselmann
      result = self._StartDaemon()
266 033a1d00 Michael Hanselmann
      if result.fail_msg:
267 033a1d00 Michael Hanselmann
        raise _ImportExportError("Failed to start %s on %s: %s" %
268 033a1d00 Michael Hanselmann
                                 (self.MODE_TEXT, self.node_name,
269 033a1d00 Michael Hanselmann
                                  result.fail_msg))
270 033a1d00 Michael Hanselmann
271 033a1d00 Michael Hanselmann
      daemon_name = result.payload
272 033a1d00 Michael Hanselmann
273 194e8648 Iustin Pop
      logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
274 033a1d00 Michael Hanselmann
                   self.node_name)
275 033a1d00 Michael Hanselmann
276 033a1d00 Michael Hanselmann
      self._ts_begin = time.time()
277 033a1d00 Michael Hanselmann
      self._daemon_name = daemon_name
278 033a1d00 Michael Hanselmann
279 033a1d00 Michael Hanselmann
    return self._daemon_name
280 033a1d00 Michael Hanselmann
281 033a1d00 Michael Hanselmann
  def GetDaemonName(self):
282 033a1d00 Michael Hanselmann
    """Returns the daemon name.
283 033a1d00 Michael Hanselmann

284 033a1d00 Michael Hanselmann
    """
285 033a1d00 Michael Hanselmann
    assert self._daemon_name, "Daemon has not been started"
286 033a1d00 Michael Hanselmann
    assert self._ts_cleanup is None
287 033a1d00 Michael Hanselmann
    return self._daemon_name
288 033a1d00 Michael Hanselmann
289 033a1d00 Michael Hanselmann
  def Abort(self):
290 033a1d00 Michael Hanselmann
    """Sends SIGTERM to import/export daemon (if still active).
291 033a1d00 Michael Hanselmann

292 033a1d00 Michael Hanselmann
    """
293 033a1d00 Michael Hanselmann
    if self._daemon_name:
294 194e8648 Iustin Pop
      self._lu.LogWarning("Aborting %s '%s' on %s",
295 033a1d00 Michael Hanselmann
                          self.MODE_TEXT, self._daemon_name, self.node_name)
296 033a1d00 Michael Hanselmann
      result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
297 033a1d00 Michael Hanselmann
      if result.fail_msg:
298 194e8648 Iustin Pop
        self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
299 033a1d00 Michael Hanselmann
                            self.MODE_TEXT, self._daemon_name,
300 033a1d00 Michael Hanselmann
                            self.node_name, result.fail_msg)
301 033a1d00 Michael Hanselmann
        return False
302 033a1d00 Michael Hanselmann
303 033a1d00 Michael Hanselmann
    return True
304 033a1d00 Michael Hanselmann
305 033a1d00 Michael Hanselmann
  def _SetDaemonData(self, data):
306 033a1d00 Michael Hanselmann
    """Internal function for updating status daemon data.
307 033a1d00 Michael Hanselmann

308 033a1d00 Michael Hanselmann
    @type data: L{objects.ImportExportStatus}
309 033a1d00 Michael Hanselmann
    @param data: Daemon status data
310 033a1d00 Michael Hanselmann

311 033a1d00 Michael Hanselmann
    """
312 033a1d00 Michael Hanselmann
    assert self._ts_begin is not None
313 033a1d00 Michael Hanselmann
314 033a1d00 Michael Hanselmann
    if not data:
315 f8326fca Andrea Spadaccini
      if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready):
316 033a1d00 Michael Hanselmann
        raise _ImportExportError("Didn't become ready after %s seconds" %
317 033a1d00 Michael Hanselmann
                                 self._timeouts.ready)
318 033a1d00 Michael Hanselmann
319 033a1d00 Michael Hanselmann
      return False
320 033a1d00 Michael Hanselmann
321 033a1d00 Michael Hanselmann
    self._daemon = data
322 033a1d00 Michael Hanselmann
323 033a1d00 Michael Hanselmann
    return True
324 033a1d00 Michael Hanselmann
325 033a1d00 Michael Hanselmann
  def SetDaemonData(self, success, data):
326 033a1d00 Michael Hanselmann
    """Updates daemon status data.
327 033a1d00 Michael Hanselmann

328 033a1d00 Michael Hanselmann
    @type success: bool
329 033a1d00 Michael Hanselmann
    @param success: Whether fetching data was successful or not
330 033a1d00 Michael Hanselmann
    @type data: L{objects.ImportExportStatus}
331 033a1d00 Michael Hanselmann
    @param data: Daemon status data
332 033a1d00 Michael Hanselmann

333 033a1d00 Michael Hanselmann
    """
334 033a1d00 Michael Hanselmann
    if not success:
335 033a1d00 Michael Hanselmann
      if self._ts_last_error is None:
336 033a1d00 Michael Hanselmann
        self._ts_last_error = time.time()
337 033a1d00 Michael Hanselmann
338 f8326fca Andrea Spadaccini
      elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error):
339 033a1d00 Michael Hanselmann
        raise _ImportExportError("Too many errors while updating data")
340 033a1d00 Michael Hanselmann
341 033a1d00 Michael Hanselmann
      return False
342 033a1d00 Michael Hanselmann
343 033a1d00 Michael Hanselmann
    self._ts_last_error = None
344 033a1d00 Michael Hanselmann
345 033a1d00 Michael Hanselmann
    return self._SetDaemonData(data)
346 033a1d00 Michael Hanselmann
347 033a1d00 Michael Hanselmann
  def CheckListening(self):
348 033a1d00 Michael Hanselmann
    """Checks whether the daemon is listening.
349 033a1d00 Michael Hanselmann

350 033a1d00 Michael Hanselmann
    """
351 033a1d00 Michael Hanselmann
    raise NotImplementedError()
352 033a1d00 Michael Hanselmann
353 033a1d00 Michael Hanselmann
  def _GetConnectedCheckEpoch(self):
354 033a1d00 Michael Hanselmann
    """Returns timeout to calculate connect timeout.
355 033a1d00 Michael Hanselmann

356 033a1d00 Michael Hanselmann
    """
357 033a1d00 Michael Hanselmann
    raise NotImplementedError()
358 033a1d00 Michael Hanselmann
359 033a1d00 Michael Hanselmann
  def CheckConnected(self):
360 033a1d00 Michael Hanselmann
    """Checks whether the daemon is connected.
361 033a1d00 Michael Hanselmann

362 033a1d00 Michael Hanselmann
    @rtype: bool
363 033a1d00 Michael Hanselmann
    @return: Whether the daemon is connected
364 033a1d00 Michael Hanselmann

365 033a1d00 Michael Hanselmann
    """
366 033a1d00 Michael Hanselmann
    assert self._daemon, "Daemon status missing"
367 033a1d00 Michael Hanselmann
368 033a1d00 Michael Hanselmann
    if self._ts_connected is not None:
369 033a1d00 Michael Hanselmann
      return True
370 033a1d00 Michael Hanselmann
371 033a1d00 Michael Hanselmann
    if self._daemon.connected:
372 033a1d00 Michael Hanselmann
      self._ts_connected = time.time()
373 033a1d00 Michael Hanselmann
374 033a1d00 Michael Hanselmann
      # TODO: Log remote peer
375 194e8648 Iustin Pop
      logging.debug("%s '%s' on %s is now connected",
376 033a1d00 Michael Hanselmann
                    self.MODE_TEXT, self._daemon_name, self.node_name)
377 033a1d00 Michael Hanselmann
378 033a1d00 Michael Hanselmann
      self._cbs.ReportConnected(self, self._private)
379 033a1d00 Michael Hanselmann
380 033a1d00 Michael Hanselmann
      return True
381 033a1d00 Michael Hanselmann
382 f8326fca Andrea Spadaccini
    if utils.TimeoutExpired(self._GetConnectedCheckEpoch(),
383 f8326fca Andrea Spadaccini
                            self._timeouts.connect):
384 033a1d00 Michael Hanselmann
      raise _ImportExportError("Not connected after %s seconds" %
385 033a1d00 Michael Hanselmann
                               self._timeouts.connect)
386 033a1d00 Michael Hanselmann
387 033a1d00 Michael Hanselmann
    return False
388 033a1d00 Michael Hanselmann
389 1a2e7fe9 Michael Hanselmann
  def _CheckProgress(self):
390 1a2e7fe9 Michael Hanselmann
    """Checks whether a progress update should be reported.
391 1a2e7fe9 Michael Hanselmann

392 1a2e7fe9 Michael Hanselmann
    """
393 1a2e7fe9 Michael Hanselmann
    if ((self._ts_last_progress is None or
394 f8326fca Andrea Spadaccini
        utils.TimeoutExpired(self._ts_last_progress,
395 f8326fca Andrea Spadaccini
                             self._timeouts.progress)) and
396 1a2e7fe9 Michael Hanselmann
        self._daemon and
397 1a2e7fe9 Michael Hanselmann
        self._daemon.progress_mbytes is not None and
398 1a2e7fe9 Michael Hanselmann
        self._daemon.progress_throughput is not None):
399 1a2e7fe9 Michael Hanselmann
      self._cbs.ReportProgress(self, self._private)
400 1a2e7fe9 Michael Hanselmann
      self._ts_last_progress = time.time()
401 1a2e7fe9 Michael Hanselmann
402 033a1d00 Michael Hanselmann
  def CheckFinished(self):
403 033a1d00 Michael Hanselmann
    """Checks whether the daemon exited.
404 033a1d00 Michael Hanselmann

405 033a1d00 Michael Hanselmann
    @rtype: bool
406 033a1d00 Michael Hanselmann
    @return: Whether the transfer is finished
407 033a1d00 Michael Hanselmann

408 033a1d00 Michael Hanselmann
    """
409 033a1d00 Michael Hanselmann
    assert self._daemon, "Daemon status missing"
410 033a1d00 Michael Hanselmann
411 033a1d00 Michael Hanselmann
    if self._ts_finished:
412 033a1d00 Michael Hanselmann
      return True
413 033a1d00 Michael Hanselmann
414 033a1d00 Michael Hanselmann
    if self._daemon.exit_status is None:
415 1a2e7fe9 Michael Hanselmann
      # TODO: Adjust delay for ETA expiring soon
416 1a2e7fe9 Michael Hanselmann
      self._CheckProgress()
417 033a1d00 Michael Hanselmann
      return False
418 033a1d00 Michael Hanselmann
419 033a1d00 Michael Hanselmann
    self._ts_finished = time.time()
420 033a1d00 Michael Hanselmann
421 033a1d00 Michael Hanselmann
    self._ReportFinished(self._daemon.exit_status == 0,
422 033a1d00 Michael Hanselmann
                         self._daemon.error_message)
423 033a1d00 Michael Hanselmann
424 033a1d00 Michael Hanselmann
    return True
425 033a1d00 Michael Hanselmann
426 033a1d00 Michael Hanselmann
  def _ReportFinished(self, success, message):
427 033a1d00 Michael Hanselmann
    """Transfer is finished or daemon exited.
428 033a1d00 Michael Hanselmann

429 033a1d00 Michael Hanselmann
    @type success: bool
430 033a1d00 Michael Hanselmann
    @param success: Whether the transfer was successful
431 033a1d00 Michael Hanselmann
    @type message: string
432 033a1d00 Michael Hanselmann
    @param message: Error message
433 033a1d00 Michael Hanselmann

434 033a1d00 Michael Hanselmann
    """
435 033a1d00 Michael Hanselmann
    assert self.success is None
436 033a1d00 Michael Hanselmann
437 033a1d00 Michael Hanselmann
    self.success = success
438 033a1d00 Michael Hanselmann
    self.final_message = message
439 033a1d00 Michael Hanselmann
440 033a1d00 Michael Hanselmann
    if success:
441 194e8648 Iustin Pop
      logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
442 194e8648 Iustin Pop
                   self._daemon_name, self.node_name)
443 033a1d00 Michael Hanselmann
    elif self._daemon_name:
444 194e8648 Iustin Pop
      self._lu.LogWarning("%s '%s' on %s failed: %s",
445 033a1d00 Michael Hanselmann
                          self.MODE_TEXT, self._daemon_name, self.node_name,
446 033a1d00 Michael Hanselmann
                          message)
447 033a1d00 Michael Hanselmann
    else:
448 033a1d00 Michael Hanselmann
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
449 033a1d00 Michael Hanselmann
                          self.node_name, message)
450 033a1d00 Michael Hanselmann
451 033a1d00 Michael Hanselmann
    self._cbs.ReportFinished(self, self._private)
452 033a1d00 Michael Hanselmann
453 033a1d00 Michael Hanselmann
  def _Finalize(self):
454 033a1d00 Michael Hanselmann
    """Makes the RPC call to finalize this import/export.
455 033a1d00 Michael Hanselmann

456 033a1d00 Michael Hanselmann
    """
457 033a1d00 Michael Hanselmann
    return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
458 033a1d00 Michael Hanselmann
459 033a1d00 Michael Hanselmann
  def Finalize(self, error=None):
460 033a1d00 Michael Hanselmann
    """Finalizes this import/export.
461 033a1d00 Michael Hanselmann

462 033a1d00 Michael Hanselmann
    """
463 033a1d00 Michael Hanselmann
    if self._daemon_name:
464 194e8648 Iustin Pop
      logging.info("Finalizing %s '%s' on %s",
465 033a1d00 Michael Hanselmann
                   self.MODE_TEXT, self._daemon_name, self.node_name)
466 033a1d00 Michael Hanselmann
467 033a1d00 Michael Hanselmann
      result = self._Finalize()
468 033a1d00 Michael Hanselmann
      if result.fail_msg:
469 194e8648 Iustin Pop
        self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
470 033a1d00 Michael Hanselmann
                            self.MODE_TEXT, self._daemon_name,
471 033a1d00 Michael Hanselmann
                            self.node_name, result.fail_msg)
472 033a1d00 Michael Hanselmann
        return False
473 033a1d00 Michael Hanselmann
474 033a1d00 Michael Hanselmann
      # Daemon is no longer running
475 033a1d00 Michael Hanselmann
      self._daemon_name = None
476 033a1d00 Michael Hanselmann
      self._ts_cleanup = time.time()
477 033a1d00 Michael Hanselmann
478 033a1d00 Michael Hanselmann
    if error:
479 033a1d00 Michael Hanselmann
      self._ReportFinished(False, error)
480 033a1d00 Michael Hanselmann
481 033a1d00 Michael Hanselmann
    return True
482 033a1d00 Michael Hanselmann
483 033a1d00 Michael Hanselmann
484 033a1d00 Michael Hanselmann
class DiskImport(_DiskImportExportBase):
485 033a1d00 Michael Hanselmann
  MODE_TEXT = "import"
486 033a1d00 Michael Hanselmann
487 5e26c4d9 Iustin Pop
  def __init__(self, lu, node_name, opts, instance, component,
488 033a1d00 Michael Hanselmann
               dest, dest_args, timeouts, cbs, private=None):
489 033a1d00 Michael Hanselmann
    """Initializes this class.
490 033a1d00 Michael Hanselmann

491 033a1d00 Michael Hanselmann
    @param lu: Logical unit instance
492 033a1d00 Michael Hanselmann
    @type node_name: string
493 033a1d00 Michael Hanselmann
    @param node_name: Node name for import
494 eb630f50 Michael Hanselmann
    @type opts: L{objects.ImportExportOptions}
495 eb630f50 Michael Hanselmann
    @param opts: Import/export daemon options
496 033a1d00 Michael Hanselmann
    @type instance: L{objects.Instance}
497 033a1d00 Michael Hanselmann
    @param instance: Instance object
498 5e26c4d9 Iustin Pop
    @type component: string
499 5e26c4d9 Iustin Pop
    @param component: which part of the instance is being imported
500 033a1d00 Michael Hanselmann
    @param dest: I/O destination
501 033a1d00 Michael Hanselmann
    @param dest_args: I/O arguments
502 033a1d00 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
503 033a1d00 Michael Hanselmann
    @param timeouts: Timeouts for this import
504 033a1d00 Michael Hanselmann
    @type cbs: L{ImportExportCbBase}
505 033a1d00 Michael Hanselmann
    @param cbs: Callbacks
506 033a1d00 Michael Hanselmann
    @param private: Private data for callback functions
507 033a1d00 Michael Hanselmann

508 033a1d00 Michael Hanselmann
    """
509 5e26c4d9 Iustin Pop
    _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
510 5e26c4d9 Iustin Pop
                                   component, timeouts, cbs, private)
511 033a1d00 Michael Hanselmann
    self._dest = dest
512 033a1d00 Michael Hanselmann
    self._dest_args = dest_args
513 033a1d00 Michael Hanselmann
514 033a1d00 Michael Hanselmann
    # Timestamps
515 033a1d00 Michael Hanselmann
    self._ts_listening = None
516 033a1d00 Michael Hanselmann
517 033a1d00 Michael Hanselmann
  @property
518 033a1d00 Michael Hanselmann
  def listen_port(self):
519 033a1d00 Michael Hanselmann
    """Returns the port the daemon is listening on.
520 033a1d00 Michael Hanselmann

521 033a1d00 Michael Hanselmann
    """
522 033a1d00 Michael Hanselmann
    if self._daemon:
523 033a1d00 Michael Hanselmann
      return self._daemon.listen_port
524 033a1d00 Michael Hanselmann
525 033a1d00 Michael Hanselmann
    return None
526 033a1d00 Michael Hanselmann
527 033a1d00 Michael Hanselmann
  def _StartDaemon(self):
528 033a1d00 Michael Hanselmann
    """Starts the import daemon.
529 033a1d00 Michael Hanselmann

530 033a1d00 Michael Hanselmann
    """
531 eb630f50 Michael Hanselmann
    return self._lu.rpc.call_import_start(self.node_name, self._opts,
532 6613661a Iustin Pop
                                          self._instance, self._component,
533 b8c160c1 Michael Hanselmann
                                          (self._dest, self._dest_args))
534 033a1d00 Michael Hanselmann
535 033a1d00 Michael Hanselmann
  def CheckListening(self):
536 033a1d00 Michael Hanselmann
    """Checks whether the daemon is listening.
537 033a1d00 Michael Hanselmann

538 033a1d00 Michael Hanselmann
    @rtype: bool
539 033a1d00 Michael Hanselmann
    @return: Whether the daemon is listening
540 033a1d00 Michael Hanselmann

541 033a1d00 Michael Hanselmann
    """
542 033a1d00 Michael Hanselmann
    assert self._daemon, "Daemon status missing"
543 033a1d00 Michael Hanselmann
544 033a1d00 Michael Hanselmann
    if self._ts_listening is not None:
545 033a1d00 Michael Hanselmann
      return True
546 033a1d00 Michael Hanselmann
547 033a1d00 Michael Hanselmann
    port = self._daemon.listen_port
548 033a1d00 Michael Hanselmann
    if port is not None:
549 033a1d00 Michael Hanselmann
      self._ts_listening = time.time()
550 033a1d00 Michael Hanselmann
551 194e8648 Iustin Pop
      logging.debug("Import '%s' on %s is now listening on port %s",
552 033a1d00 Michael Hanselmann
                    self._daemon_name, self.node_name, port)
553 033a1d00 Michael Hanselmann
554 5e26c4d9 Iustin Pop
      self._cbs.ReportListening(self, self._private, self._component)
555 033a1d00 Michael Hanselmann
556 033a1d00 Michael Hanselmann
      return True
557 033a1d00 Michael Hanselmann
558 f8326fca Andrea Spadaccini
    if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen):
559 033a1d00 Michael Hanselmann
      raise _ImportExportError("Not listening after %s seconds" %
560 033a1d00 Michael Hanselmann
                               self._timeouts.listen)
561 033a1d00 Michael Hanselmann
562 033a1d00 Michael Hanselmann
    return False
563 033a1d00 Michael Hanselmann
564 033a1d00 Michael Hanselmann
  def _GetConnectedCheckEpoch(self):
565 033a1d00 Michael Hanselmann
    """Returns the time since we started listening.
566 033a1d00 Michael Hanselmann

567 033a1d00 Michael Hanselmann
    """
568 033a1d00 Michael Hanselmann
    assert self._ts_listening is not None, \
569 033a1d00 Michael Hanselmann
           ("Checking whether an import is connected is only useful"
570 033a1d00 Michael Hanselmann
            " once it's been listening")
571 033a1d00 Michael Hanselmann
572 033a1d00 Michael Hanselmann
    return self._ts_listening
573 033a1d00 Michael Hanselmann
574 033a1d00 Michael Hanselmann
575 033a1d00 Michael Hanselmann
class DiskExport(_DiskImportExportBase):
576 033a1d00 Michael Hanselmann
  MODE_TEXT = "export"
577 033a1d00 Michael Hanselmann
578 5e26c4d9 Iustin Pop
  def __init__(self, lu, node_name, opts, dest_host, dest_port,
579 5e26c4d9 Iustin Pop
               instance, component, source, source_args,
580 033a1d00 Michael Hanselmann
               timeouts, cbs, private=None):
581 033a1d00 Michael Hanselmann
    """Initializes this class.
582 033a1d00 Michael Hanselmann

583 033a1d00 Michael Hanselmann
    @param lu: Logical unit instance
584 033a1d00 Michael Hanselmann
    @type node_name: string
585 033a1d00 Michael Hanselmann
    @param node_name: Node name for import
586 eb630f50 Michael Hanselmann
    @type opts: L{objects.ImportExportOptions}
587 eb630f50 Michael Hanselmann
    @param opts: Import/export daemon options
588 033a1d00 Michael Hanselmann
    @type dest_host: string
589 033a1d00 Michael Hanselmann
    @param dest_host: Destination host name or IP address
590 033a1d00 Michael Hanselmann
    @type dest_port: number
591 033a1d00 Michael Hanselmann
    @param dest_port: Destination port number
592 033a1d00 Michael Hanselmann
    @type instance: L{objects.Instance}
593 033a1d00 Michael Hanselmann
    @param instance: Instance object
594 5e26c4d9 Iustin Pop
    @type component: string
595 5e26c4d9 Iustin Pop
    @param component: which part of the instance is being imported
596 033a1d00 Michael Hanselmann
    @param source: I/O source
597 033a1d00 Michael Hanselmann
    @param source_args: I/O source
598 033a1d00 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
599 033a1d00 Michael Hanselmann
    @param timeouts: Timeouts for this import
600 033a1d00 Michael Hanselmann
    @type cbs: L{ImportExportCbBase}
601 033a1d00 Michael Hanselmann
    @param cbs: Callbacks
602 033a1d00 Michael Hanselmann
    @param private: Private data for callback functions
603 033a1d00 Michael Hanselmann

604 033a1d00 Michael Hanselmann
    """
605 5e26c4d9 Iustin Pop
    _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
606 5e26c4d9 Iustin Pop
                                   component, timeouts, cbs, private)
607 033a1d00 Michael Hanselmann
    self._dest_host = dest_host
608 033a1d00 Michael Hanselmann
    self._dest_port = dest_port
609 033a1d00 Michael Hanselmann
    self._source = source
610 033a1d00 Michael Hanselmann
    self._source_args = source_args
611 033a1d00 Michael Hanselmann
612 033a1d00 Michael Hanselmann
  def _StartDaemon(self):
613 033a1d00 Michael Hanselmann
    """Starts the export daemon.
614 033a1d00 Michael Hanselmann

615 033a1d00 Michael Hanselmann
    """
616 eb630f50 Michael Hanselmann
    return self._lu.rpc.call_export_start(self.node_name, self._opts,
617 033a1d00 Michael Hanselmann
                                          self._dest_host, self._dest_port,
618 6613661a Iustin Pop
                                          self._instance, self._component,
619 b8c160c1 Michael Hanselmann
                                          (self._source, self._source_args))
620 033a1d00 Michael Hanselmann
621 033a1d00 Michael Hanselmann
  def CheckListening(self):
622 033a1d00 Michael Hanselmann
    """Checks whether the daemon is listening.
623 033a1d00 Michael Hanselmann

624 033a1d00 Michael Hanselmann
    """
625 033a1d00 Michael Hanselmann
    # Only an import can be listening
626 033a1d00 Michael Hanselmann
    return True
627 033a1d00 Michael Hanselmann
628 033a1d00 Michael Hanselmann
  def _GetConnectedCheckEpoch(self):
629 033a1d00 Michael Hanselmann
    """Returns the time since the daemon started.
630 033a1d00 Michael Hanselmann

631 033a1d00 Michael Hanselmann
    """
632 033a1d00 Michael Hanselmann
    assert self._ts_begin is not None
633 033a1d00 Michael Hanselmann
634 033a1d00 Michael Hanselmann
    return self._ts_begin
635 033a1d00 Michael Hanselmann
636 033a1d00 Michael Hanselmann
637 1a2e7fe9 Michael Hanselmann
def FormatProgress(progress):
638 1a2e7fe9 Michael Hanselmann
  """Formats progress information for user consumption
639 1a2e7fe9 Michael Hanselmann

640 1a2e7fe9 Michael Hanselmann
  """
641 e6b8d02d Michael Hanselmann
  (mbytes, throughput, percent, eta) = progress
642 1a2e7fe9 Michael Hanselmann
643 1a2e7fe9 Michael Hanselmann
  parts = [
644 1a2e7fe9 Michael Hanselmann
    utils.FormatUnit(mbytes, "h"),
645 1a2e7fe9 Michael Hanselmann
646 1a2e7fe9 Michael Hanselmann
    # Not using FormatUnit as it doesn't support kilobytes
647 1a2e7fe9 Michael Hanselmann
    "%0.1f MiB/s" % throughput,
648 1a2e7fe9 Michael Hanselmann
    ]
649 1a2e7fe9 Michael Hanselmann
650 1a2e7fe9 Michael Hanselmann
  if percent is not None:
651 1a2e7fe9 Michael Hanselmann
    parts.append("%d%%" % percent)
652 1a2e7fe9 Michael Hanselmann
653 e6b8d02d Michael Hanselmann
  if eta is not None:
654 e6b8d02d Michael Hanselmann
    parts.append("ETA %s" % utils.FormatSeconds(eta))
655 1a2e7fe9 Michael Hanselmann
656 1a2e7fe9 Michael Hanselmann
  return utils.CommaJoin(parts)
657 1a2e7fe9 Michael Hanselmann
658 1a2e7fe9 Michael Hanselmann
659 033a1d00 Michael Hanselmann
class ImportExportLoop:
660 033a1d00 Michael Hanselmann
  MIN_DELAY = 1.0
661 033a1d00 Michael Hanselmann
  MAX_DELAY = 20.0
662 033a1d00 Michael Hanselmann
663 033a1d00 Michael Hanselmann
  def __init__(self, lu):
664 033a1d00 Michael Hanselmann
    """Initializes this class.
665 033a1d00 Michael Hanselmann

666 033a1d00 Michael Hanselmann
    """
667 033a1d00 Michael Hanselmann
    self._lu = lu
668 033a1d00 Michael Hanselmann
    self._queue = []
669 033a1d00 Michael Hanselmann
    self._pending_add = []
670 033a1d00 Michael Hanselmann
671 033a1d00 Michael Hanselmann
  def Add(self, diskie):
672 033a1d00 Michael Hanselmann
    """Adds an import/export object to the loop.
673 033a1d00 Michael Hanselmann

674 033a1d00 Michael Hanselmann
    @type diskie: Subclass of L{_DiskImportExportBase}
675 033a1d00 Michael Hanselmann
    @param diskie: Import/export object
676 033a1d00 Michael Hanselmann

677 033a1d00 Michael Hanselmann
    """
678 033a1d00 Michael Hanselmann
    assert diskie not in self._pending_add
679 033a1d00 Michael Hanselmann
    assert diskie.loop is None
680 033a1d00 Michael Hanselmann
681 033a1d00 Michael Hanselmann
    diskie.SetLoop(self)
682 033a1d00 Michael Hanselmann
683 033a1d00 Michael Hanselmann
    # Adding new objects to a staging list is necessary, otherwise the main
684 033a1d00 Michael Hanselmann
    # loop gets confused if callbacks modify the queue while the main loop is
685 033a1d00 Michael Hanselmann
    # iterating over it.
686 033a1d00 Michael Hanselmann
    self._pending_add.append(diskie)
687 033a1d00 Michael Hanselmann
688 033a1d00 Michael Hanselmann
  @staticmethod
689 033a1d00 Michael Hanselmann
  def _CollectDaemonStatus(lu, daemons):
690 033a1d00 Michael Hanselmann
    """Collects the status for all import/export daemons.
691 033a1d00 Michael Hanselmann

692 033a1d00 Michael Hanselmann
    """
693 033a1d00 Michael Hanselmann
    daemon_status = {}
694 033a1d00 Michael Hanselmann
695 033a1d00 Michael Hanselmann
    for node_name, names in daemons.iteritems():
696 033a1d00 Michael Hanselmann
      result = lu.rpc.call_impexp_status(node_name, names)
697 033a1d00 Michael Hanselmann
      if result.fail_msg:
698 033a1d00 Michael Hanselmann
        lu.LogWarning("Failed to get daemon status on %s: %s",
699 033a1d00 Michael Hanselmann
                      node_name, result.fail_msg)
700 033a1d00 Michael Hanselmann
        continue
701 033a1d00 Michael Hanselmann
702 033a1d00 Michael Hanselmann
      assert len(names) == len(result.payload)
703 033a1d00 Michael Hanselmann
704 033a1d00 Michael Hanselmann
      daemon_status[node_name] = dict(zip(names, result.payload))
705 033a1d00 Michael Hanselmann
706 033a1d00 Michael Hanselmann
    return daemon_status
707 033a1d00 Michael Hanselmann
708 033a1d00 Michael Hanselmann
  @staticmethod
709 033a1d00 Michael Hanselmann
  def _GetActiveDaemonNames(queue):
710 033a1d00 Michael Hanselmann
    """Gets the names of all active daemons.
711 033a1d00 Michael Hanselmann

712 033a1d00 Michael Hanselmann
    """
713 033a1d00 Michael Hanselmann
    result = {}
714 033a1d00 Michael Hanselmann
    for diskie in queue:
715 033a1d00 Michael Hanselmann
      if not diskie.active:
716 033a1d00 Michael Hanselmann
        continue
717 033a1d00 Michael Hanselmann
718 033a1d00 Michael Hanselmann
      try:
719 033a1d00 Michael Hanselmann
        # Start daemon if necessary
720 033a1d00 Michael Hanselmann
        daemon_name = diskie.CheckDaemon()
721 033a1d00 Michael Hanselmann
      except _ImportExportError, err:
722 033a1d00 Michael Hanselmann
        logging.exception("%s failed", diskie.MODE_TEXT)
723 033a1d00 Michael Hanselmann
        diskie.Finalize(error=str(err))
724 033a1d00 Michael Hanselmann
        continue
725 033a1d00 Michael Hanselmann
726 033a1d00 Michael Hanselmann
      result.setdefault(diskie.node_name, []).append(daemon_name)
727 033a1d00 Michael Hanselmann
728 033a1d00 Michael Hanselmann
    assert len(queue) >= len(result)
729 033a1d00 Michael Hanselmann
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
730 033a1d00 Michael Hanselmann
731 033a1d00 Michael Hanselmann
    logging.debug("daemons=%r", result)
732 033a1d00 Michael Hanselmann
733 033a1d00 Michael Hanselmann
    return result
734 033a1d00 Michael Hanselmann
735 033a1d00 Michael Hanselmann
  def _AddPendingToQueue(self):
736 033a1d00 Michael Hanselmann
    """Adds all pending import/export objects to the internal queue.
737 033a1d00 Michael Hanselmann

738 033a1d00 Michael Hanselmann
    """
739 033a1d00 Michael Hanselmann
    assert compat.all(diskie not in self._queue and diskie.loop == self
740 033a1d00 Michael Hanselmann
                      for diskie in self._pending_add)
741 033a1d00 Michael Hanselmann
742 033a1d00 Michael Hanselmann
    self._queue.extend(self._pending_add)
743 033a1d00 Michael Hanselmann
744 033a1d00 Michael Hanselmann
    del self._pending_add[:]
745 033a1d00 Michael Hanselmann
746 033a1d00 Michael Hanselmann
  def Run(self):
747 033a1d00 Michael Hanselmann
    """Utility main loop.
748 033a1d00 Michael Hanselmann

749 033a1d00 Michael Hanselmann
    """
750 033a1d00 Michael Hanselmann
    while True:
751 033a1d00 Michael Hanselmann
      self._AddPendingToQueue()
752 033a1d00 Michael Hanselmann
753 033a1d00 Michael Hanselmann
      # Collect all active daemon names
754 033a1d00 Michael Hanselmann
      daemons = self._GetActiveDaemonNames(self._queue)
755 033a1d00 Michael Hanselmann
      if not daemons:
756 033a1d00 Michael Hanselmann
        break
757 033a1d00 Michael Hanselmann
758 033a1d00 Michael Hanselmann
      # Collection daemon status data
759 033a1d00 Michael Hanselmann
      data = self._CollectDaemonStatus(self._lu, daemons)
760 033a1d00 Michael Hanselmann
761 033a1d00 Michael Hanselmann
      # Use data
762 033a1d00 Michael Hanselmann
      delay = self.MAX_DELAY
763 033a1d00 Michael Hanselmann
      for diskie in self._queue:
764 033a1d00 Michael Hanselmann
        if not diskie.active:
765 033a1d00 Michael Hanselmann
          continue
766 033a1d00 Michael Hanselmann
767 033a1d00 Michael Hanselmann
        try:
768 033a1d00 Michael Hanselmann
          try:
769 033a1d00 Michael Hanselmann
            all_daemon_data = data[diskie.node_name]
770 033a1d00 Michael Hanselmann
          except KeyError:
771 033a1d00 Michael Hanselmann
            result = diskie.SetDaemonData(False, None)
772 033a1d00 Michael Hanselmann
          else:
773 033a1d00 Michael Hanselmann
            result = \
774 033a1d00 Michael Hanselmann
              diskie.SetDaemonData(True,
775 033a1d00 Michael Hanselmann
                                   all_daemon_data[diskie.GetDaemonName()])
776 033a1d00 Michael Hanselmann
777 033a1d00 Michael Hanselmann
          if not result:
778 033a1d00 Michael Hanselmann
            # Daemon not yet ready, retry soon
779 033a1d00 Michael Hanselmann
            delay = min(3.0, delay)
780 033a1d00 Michael Hanselmann
            continue
781 033a1d00 Michael Hanselmann
782 033a1d00 Michael Hanselmann
          if diskie.CheckFinished():
783 033a1d00 Michael Hanselmann
            # Transfer finished
784 033a1d00 Michael Hanselmann
            diskie.Finalize()
785 033a1d00 Michael Hanselmann
            continue
786 033a1d00 Michael Hanselmann
787 033a1d00 Michael Hanselmann
          # Normal case: check again in 5 seconds
788 033a1d00 Michael Hanselmann
          delay = min(5.0, delay)
789 033a1d00 Michael Hanselmann
790 033a1d00 Michael Hanselmann
          if not diskie.CheckListening():
791 033a1d00 Michael Hanselmann
            # Not yet listening, retry soon
792 033a1d00 Michael Hanselmann
            delay = min(1.0, delay)
793 033a1d00 Michael Hanselmann
            continue
794 033a1d00 Michael Hanselmann
795 033a1d00 Michael Hanselmann
          if not diskie.CheckConnected():
796 033a1d00 Michael Hanselmann
            # Not yet connected, retry soon
797 033a1d00 Michael Hanselmann
            delay = min(1.0, delay)
798 033a1d00 Michael Hanselmann
            continue
799 033a1d00 Michael Hanselmann
800 033a1d00 Michael Hanselmann
        except _ImportExportError, err:
801 033a1d00 Michael Hanselmann
          logging.exception("%s failed", diskie.MODE_TEXT)
802 033a1d00 Michael Hanselmann
          diskie.Finalize(error=str(err))
803 033a1d00 Michael Hanselmann
804 403f5172 Guido Trotter
      if not compat.any(diskie.active for diskie in self._queue):
805 033a1d00 Michael Hanselmann
        break
806 033a1d00 Michael Hanselmann
807 033a1d00 Michael Hanselmann
      # Wait a bit
808 033a1d00 Michael Hanselmann
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
809 033a1d00 Michael Hanselmann
      logging.debug("Waiting for %ss", delay)
810 033a1d00 Michael Hanselmann
      time.sleep(delay)
811 033a1d00 Michael Hanselmann
812 033a1d00 Michael Hanselmann
  def FinalizeAll(self):
813 033a1d00 Michael Hanselmann
    """Finalizes all pending transfers.
814 033a1d00 Michael Hanselmann

815 033a1d00 Michael Hanselmann
    """
816 033a1d00 Michael Hanselmann
    success = True
817 033a1d00 Michael Hanselmann
818 033a1d00 Michael Hanselmann
    for diskie in self._queue:
819 033a1d00 Michael Hanselmann
      success = diskie.Finalize() and success
820 033a1d00 Michael Hanselmann
821 033a1d00 Michael Hanselmann
    return success
822 5d97d6dd Michael Hanselmann
823 5d97d6dd Michael Hanselmann
824 5d97d6dd Michael Hanselmann
class _TransferInstCbBase(ImportExportCbBase):
825 5d97d6dd Michael Hanselmann
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
826 d51ae04c Michael Hanselmann
               dest_node, dest_ip):
827 5d97d6dd Michael Hanselmann
    """Initializes this class.
828 5d97d6dd Michael Hanselmann

829 5d97d6dd Michael Hanselmann
    """
830 5d97d6dd Michael Hanselmann
    ImportExportCbBase.__init__(self)
831 5d97d6dd Michael Hanselmann
832 5d97d6dd Michael Hanselmann
    self.lu = lu
833 5d97d6dd Michael Hanselmann
    self.feedback_fn = feedback_fn
834 5d97d6dd Michael Hanselmann
    self.instance = instance
835 5d97d6dd Michael Hanselmann
    self.timeouts = timeouts
836 5d97d6dd Michael Hanselmann
    self.src_node = src_node
837 5d97d6dd Michael Hanselmann
    self.src_cbs = src_cbs
838 5d97d6dd Michael Hanselmann
    self.dest_node = dest_node
839 5d97d6dd Michael Hanselmann
    self.dest_ip = dest_ip
840 5d97d6dd Michael Hanselmann
841 5d97d6dd Michael Hanselmann
842 5d97d6dd Michael Hanselmann
class _TransferInstSourceCb(_TransferInstCbBase):
843 5d97d6dd Michael Hanselmann
  def ReportConnected(self, ie, dtp):
844 5d97d6dd Michael Hanselmann
    """Called when a connection has been established.
845 5d97d6dd Michael Hanselmann

846 5d97d6dd Michael Hanselmann
    """
847 5d97d6dd Michael Hanselmann
    assert self.src_cbs is None
848 5d97d6dd Michael Hanselmann
    assert dtp.src_export == ie
849 5d97d6dd Michael Hanselmann
    assert dtp.dest_import
850 5d97d6dd Michael Hanselmann
851 5d97d6dd Michael Hanselmann
    self.feedback_fn("%s is sending data on %s" %
852 5d97d6dd Michael Hanselmann
                     (dtp.data.name, ie.node_name))
853 5d97d6dd Michael Hanselmann
854 1a2e7fe9 Michael Hanselmann
  def ReportProgress(self, ie, dtp):
855 1a2e7fe9 Michael Hanselmann
    """Called when new progress information should be reported.
856 1a2e7fe9 Michael Hanselmann

857 1a2e7fe9 Michael Hanselmann
    """
858 1a2e7fe9 Michael Hanselmann
    progress = ie.progress
859 1a2e7fe9 Michael Hanselmann
    if not progress:
860 1a2e7fe9 Michael Hanselmann
      return
861 1a2e7fe9 Michael Hanselmann
862 1a2e7fe9 Michael Hanselmann
    self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
863 1a2e7fe9 Michael Hanselmann
864 5d97d6dd Michael Hanselmann
  def ReportFinished(self, ie, dtp):
865 5d97d6dd Michael Hanselmann
    """Called when a transfer has finished.
866 5d97d6dd Michael Hanselmann

867 5d97d6dd Michael Hanselmann
    """
868 5d97d6dd Michael Hanselmann
    assert self.src_cbs is None
869 5d97d6dd Michael Hanselmann
    assert dtp.src_export == ie
870 5d97d6dd Michael Hanselmann
    assert dtp.dest_import
871 5d97d6dd Michael Hanselmann
872 5d97d6dd Michael Hanselmann
    if ie.success:
873 5d97d6dd Michael Hanselmann
      self.feedback_fn("%s finished sending data" % dtp.data.name)
874 5d97d6dd Michael Hanselmann
    else:
875 c9300bb3 Iustin Pop
      self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
876 5d97d6dd Michael Hanselmann
                       (dtp.data.name, ie.final_message, ie.recent_output))
877 5d97d6dd Michael Hanselmann
878 5d97d6dd Michael Hanselmann
    dtp.RecordResult(ie.success)
879 5d97d6dd Michael Hanselmann
880 5d97d6dd Michael Hanselmann
    cb = dtp.data.finished_fn
881 5d97d6dd Michael Hanselmann
    if cb:
882 5d97d6dd Michael Hanselmann
      cb()
883 5d97d6dd Michael Hanselmann
884 5d97d6dd Michael Hanselmann
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
885 5d97d6dd Michael Hanselmann
    # give the daemon a moment to sort things out
886 5d97d6dd Michael Hanselmann
    if dtp.dest_import and not ie.success:
887 5d97d6dd Michael Hanselmann
      dtp.dest_import.Abort()
888 5d97d6dd Michael Hanselmann
889 5d97d6dd Michael Hanselmann
890 5d97d6dd Michael Hanselmann
class _TransferInstDestCb(_TransferInstCbBase):
891 5e26c4d9 Iustin Pop
  def ReportListening(self, ie, dtp, component):
892 5d97d6dd Michael Hanselmann
    """Called when daemon started listening.
893 5d97d6dd Michael Hanselmann

894 5d97d6dd Michael Hanselmann
    """
895 5d97d6dd Michael Hanselmann
    assert self.src_cbs
896 5d97d6dd Michael Hanselmann
    assert dtp.src_export is None
897 5d97d6dd Michael Hanselmann
    assert dtp.dest_import
898 d51ae04c Michael Hanselmann
    assert dtp.export_opts
899 5d97d6dd Michael Hanselmann
900 5d97d6dd Michael Hanselmann
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
901 5d97d6dd Michael Hanselmann
902 5d97d6dd Michael Hanselmann
    # Start export on source node
903 d51ae04c Michael Hanselmann
    de = DiskExport(self.lu, self.src_node, dtp.export_opts,
904 5e26c4d9 Iustin Pop
                    self.dest_ip, ie.listen_port, self.instance,
905 5e26c4d9 Iustin Pop
                    component, dtp.data.src_io, dtp.data.src_ioargs,
906 5d97d6dd Michael Hanselmann
                    self.timeouts, self.src_cbs, private=dtp)
907 5d97d6dd Michael Hanselmann
    ie.loop.Add(de)
908 5d97d6dd Michael Hanselmann
909 5d97d6dd Michael Hanselmann
    dtp.src_export = de
910 5d97d6dd Michael Hanselmann
911 5d97d6dd Michael Hanselmann
  def ReportConnected(self, ie, dtp):
912 5d97d6dd Michael Hanselmann
    """Called when a connection has been established.
913 5d97d6dd Michael Hanselmann

914 5d97d6dd Michael Hanselmann
    """
915 5d97d6dd Michael Hanselmann
    self.feedback_fn("%s is receiving data on %s" %
916 5d97d6dd Michael Hanselmann
                     (dtp.data.name, self.dest_node))
917 5d97d6dd Michael Hanselmann
918 5d97d6dd Michael Hanselmann
  def ReportFinished(self, ie, dtp):
919 5d97d6dd Michael Hanselmann
    """Called when a transfer has finished.
920 5d97d6dd Michael Hanselmann

921 5d97d6dd Michael Hanselmann
    """
922 5d97d6dd Michael Hanselmann
    if ie.success:
923 5d97d6dd Michael Hanselmann
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
924 5d97d6dd Michael Hanselmann
    else:
925 c9300bb3 Iustin Pop
      self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
926 5d97d6dd Michael Hanselmann
                       (dtp.data.name, ie.final_message, ie.recent_output))
927 5d97d6dd Michael Hanselmann
928 5d97d6dd Michael Hanselmann
    dtp.RecordResult(ie.success)
929 5d97d6dd Michael Hanselmann
930 5d97d6dd Michael Hanselmann
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
931 5d97d6dd Michael Hanselmann
    # give the daemon a moment to sort things out
932 5d97d6dd Michael Hanselmann
    if dtp.src_export and not ie.success:
933 5d97d6dd Michael Hanselmann
      dtp.src_export.Abort()
934 5d97d6dd Michael Hanselmann
935 5d97d6dd Michael Hanselmann
936 5d97d6dd Michael Hanselmann
class DiskTransfer(object):
937 5d97d6dd Michael Hanselmann
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
938 5d97d6dd Michael Hanselmann
               finished_fn):
939 5d97d6dd Michael Hanselmann
    """Initializes this class.
940 5d97d6dd Michael Hanselmann

941 5d97d6dd Michael Hanselmann
    @type name: string
942 5d97d6dd Michael Hanselmann
    @param name: User-visible name for this transfer (e.g. "disk/0")
943 5d97d6dd Michael Hanselmann
    @param src_io: Source I/O type
944 5d97d6dd Michael Hanselmann
    @param src_ioargs: Source I/O arguments
945 5d97d6dd Michael Hanselmann
    @param dest_io: Destination I/O type
946 5d97d6dd Michael Hanselmann
    @param dest_ioargs: Destination I/O arguments
947 5d97d6dd Michael Hanselmann
    @type finished_fn: callable
948 5d97d6dd Michael Hanselmann
    @param finished_fn: Function called once transfer has finished
949 5d97d6dd Michael Hanselmann

950 5d97d6dd Michael Hanselmann
    """
951 5d97d6dd Michael Hanselmann
    self.name = name
952 5d97d6dd Michael Hanselmann
953 5d97d6dd Michael Hanselmann
    self.src_io = src_io
954 5d97d6dd Michael Hanselmann
    self.src_ioargs = src_ioargs
955 5d97d6dd Michael Hanselmann
956 5d97d6dd Michael Hanselmann
    self.dest_io = dest_io
957 5d97d6dd Michael Hanselmann
    self.dest_ioargs = dest_ioargs
958 5d97d6dd Michael Hanselmann
959 5d97d6dd Michael Hanselmann
    self.finished_fn = finished_fn
960 5d97d6dd Michael Hanselmann
961 5d97d6dd Michael Hanselmann
962 5d97d6dd Michael Hanselmann
class _DiskTransferPrivate(object):
963 d51ae04c Michael Hanselmann
  def __init__(self, data, success, export_opts):
964 5d97d6dd Michael Hanselmann
    """Initializes this class.
965 5d97d6dd Michael Hanselmann

966 5d97d6dd Michael Hanselmann
    @type data: L{DiskTransfer}
967 5d97d6dd Michael Hanselmann
    @type success: bool
968 5d97d6dd Michael Hanselmann

969 5d97d6dd Michael Hanselmann
    """
970 5d97d6dd Michael Hanselmann
    self.data = data
971 d51ae04c Michael Hanselmann
    self.success = success
972 d51ae04c Michael Hanselmann
    self.export_opts = export_opts
973 5d97d6dd Michael Hanselmann
974 5d97d6dd Michael Hanselmann
    self.src_export = None
975 5d97d6dd Michael Hanselmann
    self.dest_import = None
976 5d97d6dd Michael Hanselmann
977 5d97d6dd Michael Hanselmann
  def RecordResult(self, success):
978 5d97d6dd Michael Hanselmann
    """Updates the status.
979 5d97d6dd Michael Hanselmann

980 5d97d6dd Michael Hanselmann
    One failed part will cause the whole transfer to fail.
981 5d97d6dd Michael Hanselmann

982 5d97d6dd Michael Hanselmann
    """
983 5d97d6dd Michael Hanselmann
    self.success = self.success and success
984 5d97d6dd Michael Hanselmann
985 5d97d6dd Michael Hanselmann
986 d51ae04c Michael Hanselmann
def _GetInstDiskMagic(base, instance_name, index):
987 d51ae04c Michael Hanselmann
  """Computes the magic value for a disk export or import.
988 d51ae04c Michael Hanselmann

989 d51ae04c Michael Hanselmann
  @type base: string
990 d51ae04c Michael Hanselmann
  @param base: Random seed value (can be the same for all disks of a transfer)
991 d51ae04c Michael Hanselmann
  @type instance_name: string
992 d51ae04c Michael Hanselmann
  @param instance_name: Name of instance
993 d51ae04c Michael Hanselmann
  @type index: number
994 d51ae04c Michael Hanselmann
  @param index: Disk index
995 d51ae04c Michael Hanselmann

996 d51ae04c Michael Hanselmann
  """
997 d51ae04c Michael Hanselmann
  h = compat.sha1_hash()
998 d51ae04c Michael Hanselmann
  h.update(str(constants.RIE_VERSION))
999 d51ae04c Michael Hanselmann
  h.update(base)
1000 d51ae04c Michael Hanselmann
  h.update(instance_name)
1001 d51ae04c Michael Hanselmann
  h.update(str(index))
1002 d51ae04c Michael Hanselmann
  return h.hexdigest()
1003 d51ae04c Michael Hanselmann
1004 d51ae04c Michael Hanselmann
1005 5d97d6dd Michael Hanselmann
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
1006 5d97d6dd Michael Hanselmann
                         instance, all_transfers):
1007 5d97d6dd Michael Hanselmann
  """Transfers an instance's data from one node to another.
1008 5d97d6dd Michael Hanselmann

1009 5d97d6dd Michael Hanselmann
  @param lu: Logical unit instance
1010 5d97d6dd Michael Hanselmann
  @param feedback_fn: Feedback function
1011 5d97d6dd Michael Hanselmann
  @type src_node: string
1012 5d97d6dd Michael Hanselmann
  @param src_node: Source node name
1013 5d97d6dd Michael Hanselmann
  @type dest_node: string
1014 5d97d6dd Michael Hanselmann
  @param dest_node: Destination node name
1015 5d97d6dd Michael Hanselmann
  @type dest_ip: string
1016 5d97d6dd Michael Hanselmann
  @param dest_ip: IP address of destination node
1017 5d97d6dd Michael Hanselmann
  @type instance: L{objects.Instance}
1018 5d97d6dd Michael Hanselmann
  @param instance: Instance object
1019 5d97d6dd Michael Hanselmann
  @type all_transfers: list of L{DiskTransfer} instances
1020 5d97d6dd Michael Hanselmann
  @param all_transfers: List of all disk transfers to be made
1021 5d97d6dd Michael Hanselmann
  @rtype: list
1022 5d97d6dd Michael Hanselmann
  @return: List with a boolean (True=successful, False=failed) for success for
1023 5d97d6dd Michael Hanselmann
           each transfer
1024 5d97d6dd Michael Hanselmann

1025 5d97d6dd Michael Hanselmann
  """
1026 5bb95572 Michael Hanselmann
  # Disable compression for all moves as these are all within the same cluster
1027 5bb95572 Michael Hanselmann
  compress = constants.IEC_NONE
1028 a5310c2a Michael Hanselmann
1029 a5310c2a Michael Hanselmann
  logging.debug("Source node %s, destination node %s, compression '%s'",
1030 a5310c2a Michael Hanselmann
                src_node, dest_node, compress)
1031 a5310c2a Michael Hanselmann
1032 5d97d6dd Michael Hanselmann
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1033 5d97d6dd Michael Hanselmann
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1034 d51ae04c Michael Hanselmann
                                  src_node, None, dest_node, dest_ip)
1035 5d97d6dd Michael Hanselmann
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1036 d51ae04c Michael Hanselmann
                                 src_node, src_cbs, dest_node, dest_ip)
1037 5d97d6dd Michael Hanselmann
1038 5d97d6dd Michael Hanselmann
  all_dtp = []
1039 5d97d6dd Michael Hanselmann
1040 d51ae04c Michael Hanselmann
  base_magic = utils.GenerateSecret(6)
1041 d51ae04c Michael Hanselmann
1042 5d97d6dd Michael Hanselmann
  ieloop = ImportExportLoop(lu)
1043 5d97d6dd Michael Hanselmann
  try:
1044 d51ae04c Michael Hanselmann
    for idx, transfer in enumerate(all_transfers):
1045 5d97d6dd Michael Hanselmann
      if transfer:
1046 5d97d6dd Michael Hanselmann
        feedback_fn("Exporting %s from %s to %s" %
1047 5d97d6dd Michael Hanselmann
                    (transfer.name, src_node, dest_node))
1048 5d97d6dd Michael Hanselmann
1049 d51ae04c Michael Hanselmann
        magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1050 d51ae04c Michael Hanselmann
        opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1051 d51ae04c Michael Hanselmann
                                           compress=compress, magic=magic)
1052 d51ae04c Michael Hanselmann
1053 d51ae04c Michael Hanselmann
        dtp = _DiskTransferPrivate(transfer, True, opts)
1054 5d97d6dd Michael Hanselmann
1055 5e26c4d9 Iustin Pop
        di = DiskImport(lu, dest_node, opts, instance, "disk%d" % idx,
1056 5d97d6dd Michael Hanselmann
                        transfer.dest_io, transfer.dest_ioargs,
1057 5d97d6dd Michael Hanselmann
                        timeouts, dest_cbs, private=dtp)
1058 5d97d6dd Michael Hanselmann
        ieloop.Add(di)
1059 5d97d6dd Michael Hanselmann
1060 5d97d6dd Michael Hanselmann
        dtp.dest_import = di
1061 5d97d6dd Michael Hanselmann
      else:
1062 85b3901b Michael Hanselmann
        dtp = _DiskTransferPrivate(None, False, None)
1063 5d97d6dd Michael Hanselmann
1064 5d97d6dd Michael Hanselmann
      all_dtp.append(dtp)
1065 5d97d6dd Michael Hanselmann
1066 5d97d6dd Michael Hanselmann
    ieloop.Run()
1067 5d97d6dd Michael Hanselmann
  finally:
1068 5d97d6dd Michael Hanselmann
    ieloop.FinalizeAll()
1069 5d97d6dd Michael Hanselmann
1070 5d97d6dd Michael Hanselmann
  assert len(all_dtp) == len(all_transfers)
1071 403f5172 Guido Trotter
  assert compat.all((dtp.src_export is None or
1072 5d97d6dd Michael Hanselmann
                      dtp.src_export.success is not None) and
1073 5d97d6dd Michael Hanselmann
                     (dtp.dest_import is None or
1074 5d97d6dd Michael Hanselmann
                      dtp.dest_import.success is not None)
1075 403f5172 Guido Trotter
                     for dtp in all_dtp), \
1076 5d97d6dd Michael Hanselmann
         "Not all imports/exports are finalized"
1077 5d97d6dd Michael Hanselmann
1078 5d97d6dd Michael Hanselmann
  return [bool(dtp.success) for dtp in all_dtp]
1079 387794f8 Michael Hanselmann
1080 387794f8 Michael Hanselmann
1081 4a96f1d1 Michael Hanselmann
class _RemoteExportCb(ImportExportCbBase):
1082 4a96f1d1 Michael Hanselmann
  def __init__(self, feedback_fn, disk_count):
1083 4a96f1d1 Michael Hanselmann
    """Initializes this class.
1084 4a96f1d1 Michael Hanselmann

1085 4a96f1d1 Michael Hanselmann
    """
1086 4a96f1d1 Michael Hanselmann
    ImportExportCbBase.__init__(self)
1087 4a96f1d1 Michael Hanselmann
    self._feedback_fn = feedback_fn
1088 4a96f1d1 Michael Hanselmann
    self._dresults = [None] * disk_count
1089 4a96f1d1 Michael Hanselmann
1090 4a96f1d1 Michael Hanselmann
  @property
1091 4a96f1d1 Michael Hanselmann
  def disk_results(self):
1092 4a96f1d1 Michael Hanselmann
    """Returns per-disk results.
1093 4a96f1d1 Michael Hanselmann

1094 4a96f1d1 Michael Hanselmann
    """
1095 4a96f1d1 Michael Hanselmann
    return self._dresults
1096 4a96f1d1 Michael Hanselmann
1097 4a96f1d1 Michael Hanselmann
  def ReportConnected(self, ie, private):
1098 4a96f1d1 Michael Hanselmann
    """Called when a connection has been established.
1099 4a96f1d1 Michael Hanselmann

1100 4a96f1d1 Michael Hanselmann
    """
1101 4a96f1d1 Michael Hanselmann
    (idx, _) = private
1102 4a96f1d1 Michael Hanselmann
1103 4a96f1d1 Michael Hanselmann
    self._feedback_fn("Disk %s is now sending data" % idx)
1104 4a96f1d1 Michael Hanselmann
1105 1a2e7fe9 Michael Hanselmann
  def ReportProgress(self, ie, private):
1106 1a2e7fe9 Michael Hanselmann
    """Called when new progress information should be reported.
1107 1a2e7fe9 Michael Hanselmann

1108 1a2e7fe9 Michael Hanselmann
    """
1109 1a2e7fe9 Michael Hanselmann
    (idx, _) = private
1110 1a2e7fe9 Michael Hanselmann
1111 1a2e7fe9 Michael Hanselmann
    progress = ie.progress
1112 1a2e7fe9 Michael Hanselmann
    if not progress:
1113 1a2e7fe9 Michael Hanselmann
      return
1114 1a2e7fe9 Michael Hanselmann
1115 1a2e7fe9 Michael Hanselmann
    self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1116 1a2e7fe9 Michael Hanselmann
1117 4a96f1d1 Michael Hanselmann
  def ReportFinished(self, ie, private):
1118 4a96f1d1 Michael Hanselmann
    """Called when a transfer has finished.
1119 4a96f1d1 Michael Hanselmann

1120 4a96f1d1 Michael Hanselmann
    """
1121 4a96f1d1 Michael Hanselmann
    (idx, finished_fn) = private
1122 4a96f1d1 Michael Hanselmann
1123 4a96f1d1 Michael Hanselmann
    if ie.success:
1124 4a96f1d1 Michael Hanselmann
      self._feedback_fn("Disk %s finished sending data" % idx)
1125 4a96f1d1 Michael Hanselmann
    else:
1126 c9300bb3 Iustin Pop
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1127 4a96f1d1 Michael Hanselmann
                        (idx, ie.final_message, ie.recent_output))
1128 4a96f1d1 Michael Hanselmann
1129 4a96f1d1 Michael Hanselmann
    self._dresults[idx] = bool(ie.success)
1130 4a96f1d1 Michael Hanselmann
1131 4a96f1d1 Michael Hanselmann
    if finished_fn:
1132 4a96f1d1 Michael Hanselmann
      finished_fn()
1133 4a96f1d1 Michael Hanselmann
1134 4a96f1d1 Michael Hanselmann
1135 387794f8 Michael Hanselmann
class ExportInstanceHelper:
1136 387794f8 Michael Hanselmann
  def __init__(self, lu, feedback_fn, instance):
1137 387794f8 Michael Hanselmann
    """Initializes this class.
1138 387794f8 Michael Hanselmann

1139 387794f8 Michael Hanselmann
    @param lu: Logical unit instance
1140 387794f8 Michael Hanselmann
    @param feedback_fn: Feedback function
1141 387794f8 Michael Hanselmann
    @type instance: L{objects.Instance}
1142 387794f8 Michael Hanselmann
    @param instance: Instance object
1143 387794f8 Michael Hanselmann

1144 387794f8 Michael Hanselmann
    """
1145 387794f8 Michael Hanselmann
    self._lu = lu
1146 387794f8 Michael Hanselmann
    self._feedback_fn = feedback_fn
1147 387794f8 Michael Hanselmann
    self._instance = instance
1148 387794f8 Michael Hanselmann
1149 387794f8 Michael Hanselmann
    self._snap_disks = []
1150 387794f8 Michael Hanselmann
    self._removed_snaps = [False] * len(instance.disks)
1151 387794f8 Michael Hanselmann
1152 387794f8 Michael Hanselmann
  def CreateSnapshots(self):
1153 387794f8 Michael Hanselmann
    """Creates an LVM snapshot for every disk of the instance.
1154 387794f8 Michael Hanselmann

1155 387794f8 Michael Hanselmann
    """
1156 387794f8 Michael Hanselmann
    assert not self._snap_disks
1157 387794f8 Michael Hanselmann
1158 387794f8 Michael Hanselmann
    instance = self._instance
1159 387794f8 Michael Hanselmann
    src_node = instance.primary_node
1160 387794f8 Michael Hanselmann
1161 387794f8 Michael Hanselmann
    for idx, disk in enumerate(instance.disks):
1162 387794f8 Michael Hanselmann
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1163 387794f8 Michael Hanselmann
                        (idx, src_node))
1164 387794f8 Michael Hanselmann
1165 387794f8 Michael Hanselmann
      # result.payload will be a snapshot of an lvm leaf of the one we
1166 387794f8 Michael Hanselmann
      # passed
1167 62bfbc7d René Nussbaumer
      result = self._lu.rpc.call_blockdev_snapshot(src_node, (disk, instance))
1168 800ac399 Iustin Pop
      new_dev = False
1169 387794f8 Michael Hanselmann
      msg = result.fail_msg
1170 387794f8 Michael Hanselmann
      if msg:
1171 387794f8 Michael Hanselmann
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1172 387794f8 Michael Hanselmann
                            idx, src_node, msg)
1173 800ac399 Iustin Pop
      elif (not isinstance(result.payload, (tuple, list)) or
1174 800ac399 Iustin Pop
            len(result.payload) != 2):
1175 800ac399 Iustin Pop
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1176 800ac399 Iustin Pop
                            " result '%s'", idx, src_node, result.payload)
1177 387794f8 Michael Hanselmann
      else:
1178 800ac399 Iustin Pop
        disk_id = tuple(result.payload)
1179 bc5d0215 Andrea Spadaccini
        disk_params = constants.DISK_LD_DEFAULTS[constants.LD_LV].copy()
1180 387794f8 Michael Hanselmann
        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1181 387794f8 Michael Hanselmann
                               logical_id=disk_id, physical_id=disk_id,
1182 bc5d0215 Andrea Spadaccini
                               iv_name=disk.iv_name,
1183 bc5d0215 Andrea Spadaccini
                               params=disk_params)
1184 387794f8 Michael Hanselmann
1185 387794f8 Michael Hanselmann
      self._snap_disks.append(new_dev)
1186 387794f8 Michael Hanselmann
1187 387794f8 Michael Hanselmann
    assert len(self._snap_disks) == len(instance.disks)
1188 387794f8 Michael Hanselmann
    assert len(self._removed_snaps) == len(instance.disks)
1189 387794f8 Michael Hanselmann
1190 387794f8 Michael Hanselmann
  def _RemoveSnapshot(self, disk_index):
1191 387794f8 Michael Hanselmann
    """Removes an LVM snapshot.
1192 387794f8 Michael Hanselmann

1193 387794f8 Michael Hanselmann
    @type disk_index: number
1194 387794f8 Michael Hanselmann
    @param disk_index: Index of the snapshot to be removed
1195 387794f8 Michael Hanselmann

1196 387794f8 Michael Hanselmann
    """
1197 387794f8 Michael Hanselmann
    disk = self._snap_disks[disk_index]
1198 387794f8 Michael Hanselmann
    if disk and not self._removed_snaps[disk_index]:
1199 387794f8 Michael Hanselmann
      src_node = self._instance.primary_node
1200 387794f8 Michael Hanselmann
1201 387794f8 Michael Hanselmann
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1202 387794f8 Michael Hanselmann
                        (disk_index, src_node))
1203 387794f8 Michael Hanselmann
1204 387794f8 Michael Hanselmann
      result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1205 387794f8 Michael Hanselmann
      if result.fail_msg:
1206 387794f8 Michael Hanselmann
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1207 387794f8 Michael Hanselmann
                            " %s: %s", disk_index, src_node, result.fail_msg)
1208 387794f8 Michael Hanselmann
      else:
1209 387794f8 Michael Hanselmann
        self._removed_snaps[disk_index] = True
1210 387794f8 Michael Hanselmann
1211 387794f8 Michael Hanselmann
  def LocalExport(self, dest_node):
1212 387794f8 Michael Hanselmann
    """Intra-cluster instance export.
1213 387794f8 Michael Hanselmann

1214 387794f8 Michael Hanselmann
    @type dest_node: L{objects.Node}
1215 387794f8 Michael Hanselmann
    @param dest_node: Destination node
1216 387794f8 Michael Hanselmann

1217 387794f8 Michael Hanselmann
    """
1218 387794f8 Michael Hanselmann
    instance = self._instance
1219 387794f8 Michael Hanselmann
    src_node = instance.primary_node
1220 387794f8 Michael Hanselmann
1221 387794f8 Michael Hanselmann
    assert len(self._snap_disks) == len(instance.disks)
1222 387794f8 Michael Hanselmann
1223 387794f8 Michael Hanselmann
    transfers = []
1224 387794f8 Michael Hanselmann
1225 387794f8 Michael Hanselmann
    for idx, dev in enumerate(self._snap_disks):
1226 387794f8 Michael Hanselmann
      if not dev:
1227 387794f8 Michael Hanselmann
        transfers.append(None)
1228 387794f8 Michael Hanselmann
        continue
1229 387794f8 Michael Hanselmann
1230 387794f8 Michael Hanselmann
      path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1231 387794f8 Michael Hanselmann
                            dev.physical_id[1])
1232 387794f8 Michael Hanselmann
1233 387794f8 Michael Hanselmann
      finished_fn = compat.partial(self._TransferFinished, idx)
1234 387794f8 Michael Hanselmann
1235 387794f8 Michael Hanselmann
      # FIXME: pass debug option from opcode to backend
1236 387794f8 Michael Hanselmann
      dt = DiskTransfer("snapshot/%s" % idx,
1237 387794f8 Michael Hanselmann
                        constants.IEIO_SCRIPT, (dev, idx),
1238 387794f8 Michael Hanselmann
                        constants.IEIO_FILE, (path, ),
1239 387794f8 Michael Hanselmann
                        finished_fn)
1240 387794f8 Michael Hanselmann
      transfers.append(dt)
1241 387794f8 Michael Hanselmann
1242 387794f8 Michael Hanselmann
    # Actually export data
1243 387794f8 Michael Hanselmann
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1244 387794f8 Michael Hanselmann
                                    src_node, dest_node.name,
1245 387794f8 Michael Hanselmann
                                    dest_node.secondary_ip,
1246 387794f8 Michael Hanselmann
                                    instance, transfers)
1247 387794f8 Michael Hanselmann
1248 387794f8 Michael Hanselmann
    assert len(dresults) == len(instance.disks)
1249 387794f8 Michael Hanselmann
1250 387794f8 Michael Hanselmann
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1251 387794f8 Michael Hanselmann
    result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1252 387794f8 Michael Hanselmann
                                               self._snap_disks)
1253 387794f8 Michael Hanselmann
    msg = result.fail_msg
1254 387794f8 Michael Hanselmann
    fin_resu = not msg
1255 387794f8 Michael Hanselmann
    if msg:
1256 387794f8 Michael Hanselmann
      self._lu.LogWarning("Could not finalize export for instance %s"
1257 387794f8 Michael Hanselmann
                          " on node %s: %s", instance.name, dest_node.name, msg)
1258 387794f8 Michael Hanselmann
1259 387794f8 Michael Hanselmann
    return (fin_resu, dresults)
1260 387794f8 Michael Hanselmann
1261 d51ae04c Michael Hanselmann
  def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1262 4a96f1d1 Michael Hanselmann
    """Inter-cluster instance export.
1263 4a96f1d1 Michael Hanselmann

1264 4a96f1d1 Michael Hanselmann
    @type disk_info: list
1265 4a96f1d1 Michael Hanselmann
    @param disk_info: Per-disk destination information
1266 d51ae04c Michael Hanselmann
    @type key_name: string
1267 d51ae04c Michael Hanselmann
    @param key_name: Name of X509 key to use
1268 d51ae04c Michael Hanselmann
    @type dest_ca_pem: string
1269 d51ae04c Michael Hanselmann
    @param dest_ca_pem: Destination X509 CA in PEM format
1270 4a96f1d1 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
1271 4a96f1d1 Michael Hanselmann
    @param timeouts: Timeouts for this import
1272 4a96f1d1 Michael Hanselmann

1273 4a96f1d1 Michael Hanselmann
    """
1274 4a96f1d1 Michael Hanselmann
    instance = self._instance
1275 4a96f1d1 Michael Hanselmann
1276 4a96f1d1 Michael Hanselmann
    assert len(disk_info) == len(instance.disks)
1277 4a96f1d1 Michael Hanselmann
1278 4a96f1d1 Michael Hanselmann
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1279 4a96f1d1 Michael Hanselmann
1280 4a96f1d1 Michael Hanselmann
    ieloop = ImportExportLoop(self._lu)
1281 4a96f1d1 Michael Hanselmann
    try:
1282 d51ae04c Michael Hanselmann
      for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1283 d51ae04c Michael Hanselmann
                                                           disk_info)):
1284 ba5619c2 Michael Hanselmann
        # Decide whether to use IPv6
1285 ba5619c2 Michael Hanselmann
        ipv6 = netutils.IP6Address.IsValid(host)
1286 ba5619c2 Michael Hanselmann
1287 d51ae04c Michael Hanselmann
        opts = objects.ImportExportOptions(key_name=key_name,
1288 d51ae04c Michael Hanselmann
                                           ca_pem=dest_ca_pem,
1289 ba5619c2 Michael Hanselmann
                                           magic=magic, ipv6=ipv6)
1290 d51ae04c Michael Hanselmann
1291 4a96f1d1 Michael Hanselmann
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1292 4a96f1d1 Michael Hanselmann
        finished_fn = compat.partial(self._TransferFinished, idx)
1293 4a96f1d1 Michael Hanselmann
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1294 5e26c4d9 Iustin Pop
                              opts, host, port, instance, "disk%d" % idx,
1295 4a96f1d1 Michael Hanselmann
                              constants.IEIO_SCRIPT, (dev, idx),
1296 4a96f1d1 Michael Hanselmann
                              timeouts, cbs, private=(idx, finished_fn)))
1297 4a96f1d1 Michael Hanselmann
1298 4a96f1d1 Michael Hanselmann
      ieloop.Run()
1299 4a96f1d1 Michael Hanselmann
    finally:
1300 4a96f1d1 Michael Hanselmann
      ieloop.FinalizeAll()
1301 4a96f1d1 Michael Hanselmann
1302 4a96f1d1 Michael Hanselmann
    return (True, cbs.disk_results)
1303 4a96f1d1 Michael Hanselmann
1304 387794f8 Michael Hanselmann
  def _TransferFinished(self, idx):
1305 387794f8 Michael Hanselmann
    """Called once a transfer has finished.
1306 387794f8 Michael Hanselmann

1307 387794f8 Michael Hanselmann
    @type idx: number
1308 387794f8 Michael Hanselmann
    @param idx: Disk index
1309 387794f8 Michael Hanselmann

1310 387794f8 Michael Hanselmann
    """
1311 387794f8 Michael Hanselmann
    logging.debug("Transfer %s finished", idx)
1312 387794f8 Michael Hanselmann
    self._RemoveSnapshot(idx)
1313 387794f8 Michael Hanselmann
1314 387794f8 Michael Hanselmann
  def Cleanup(self):
1315 387794f8 Michael Hanselmann
    """Remove all snapshots.
1316 387794f8 Michael Hanselmann

1317 387794f8 Michael Hanselmann
    """
1318 387794f8 Michael Hanselmann
    assert len(self._removed_snaps) == len(self._instance.disks)
1319 387794f8 Michael Hanselmann
    for idx in range(len(self._instance.disks)):
1320 387794f8 Michael Hanselmann
      self._RemoveSnapshot(idx)
1321 1410fa8d Michael Hanselmann
1322 1410fa8d Michael Hanselmann
1323 9bf56d77 Michael Hanselmann
class _RemoteImportCb(ImportExportCbBase):
1324 9bf56d77 Michael Hanselmann
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1325 9bf56d77 Michael Hanselmann
               external_address):
1326 9bf56d77 Michael Hanselmann
    """Initializes this class.
1327 9bf56d77 Michael Hanselmann

1328 9bf56d77 Michael Hanselmann
    @type cds: string
1329 9bf56d77 Michael Hanselmann
    @param cds: Cluster domain secret
1330 9bf56d77 Michael Hanselmann
    @type x509_cert_pem: string
1331 9bf56d77 Michael Hanselmann
    @param x509_cert_pem: CA used for signing import key
1332 9bf56d77 Michael Hanselmann
    @type disk_count: number
1333 9bf56d77 Michael Hanselmann
    @param disk_count: Number of disks
1334 9bf56d77 Michael Hanselmann
    @type external_address: string
1335 9bf56d77 Michael Hanselmann
    @param external_address: External address of destination node
1336 9bf56d77 Michael Hanselmann

1337 9bf56d77 Michael Hanselmann
    """
1338 9bf56d77 Michael Hanselmann
    ImportExportCbBase.__init__(self)
1339 9bf56d77 Michael Hanselmann
    self._feedback_fn = feedback_fn
1340 9bf56d77 Michael Hanselmann
    self._cds = cds
1341 9bf56d77 Michael Hanselmann
    self._x509_cert_pem = x509_cert_pem
1342 9bf56d77 Michael Hanselmann
    self._disk_count = disk_count
1343 9bf56d77 Michael Hanselmann
    self._external_address = external_address
1344 9bf56d77 Michael Hanselmann
1345 9bf56d77 Michael Hanselmann
    self._dresults = [None] * disk_count
1346 9bf56d77 Michael Hanselmann
    self._daemon_port = [None] * disk_count
1347 9bf56d77 Michael Hanselmann
1348 9bf56d77 Michael Hanselmann
    self._salt = utils.GenerateSecret(8)
1349 9bf56d77 Michael Hanselmann
1350 9bf56d77 Michael Hanselmann
  @property
1351 9bf56d77 Michael Hanselmann
  def disk_results(self):
1352 9bf56d77 Michael Hanselmann
    """Returns per-disk results.
1353 9bf56d77 Michael Hanselmann

1354 9bf56d77 Michael Hanselmann
    """
1355 9bf56d77 Michael Hanselmann
    return self._dresults
1356 9bf56d77 Michael Hanselmann
1357 9bf56d77 Michael Hanselmann
  def _CheckAllListening(self):
1358 9bf56d77 Michael Hanselmann
    """Checks whether all daemons are listening.
1359 9bf56d77 Michael Hanselmann

1360 9bf56d77 Michael Hanselmann
    If all daemons are listening, the information is sent to the client.
1361 9bf56d77 Michael Hanselmann

1362 9bf56d77 Michael Hanselmann
    """
1363 9bf56d77 Michael Hanselmann
    if not compat.all(dp is not None for dp in self._daemon_port):
1364 9bf56d77 Michael Hanselmann
      return
1365 9bf56d77 Michael Hanselmann
1366 9bf56d77 Michael Hanselmann
    host = self._external_address
1367 9bf56d77 Michael Hanselmann
1368 9bf56d77 Michael Hanselmann
    disks = []
1369 d51ae04c Michael Hanselmann
    for idx, (port, magic) in enumerate(self._daemon_port):
1370 9bf56d77 Michael Hanselmann
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1371 d51ae04c Michael Hanselmann
                                               idx, host, port, magic))
1372 9bf56d77 Michael Hanselmann
1373 9bf56d77 Michael Hanselmann
    assert len(disks) == self._disk_count
1374 9bf56d77 Michael Hanselmann
1375 9bf56d77 Michael Hanselmann
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1376 9bf56d77 Michael Hanselmann
      "disks": disks,
1377 9bf56d77 Michael Hanselmann
      "x509_ca": self._x509_cert_pem,
1378 9bf56d77 Michael Hanselmann
      })
1379 9bf56d77 Michael Hanselmann
1380 5e26c4d9 Iustin Pop
  def ReportListening(self, ie, private, _):
1381 9bf56d77 Michael Hanselmann
    """Called when daemon started listening.
1382 9bf56d77 Michael Hanselmann

1383 9bf56d77 Michael Hanselmann
    """
1384 9bf56d77 Michael Hanselmann
    (idx, ) = private
1385 9bf56d77 Michael Hanselmann
1386 9bf56d77 Michael Hanselmann
    self._feedback_fn("Disk %s is now listening" % idx)
1387 9bf56d77 Michael Hanselmann
1388 9bf56d77 Michael Hanselmann
    assert self._daemon_port[idx] is None
1389 9bf56d77 Michael Hanselmann
1390 d51ae04c Michael Hanselmann
    self._daemon_port[idx] = (ie.listen_port, ie.magic)
1391 9bf56d77 Michael Hanselmann
1392 9bf56d77 Michael Hanselmann
    self._CheckAllListening()
1393 9bf56d77 Michael Hanselmann
1394 9bf56d77 Michael Hanselmann
  def ReportConnected(self, ie, private):
1395 9bf56d77 Michael Hanselmann
    """Called when a connection has been established.
1396 9bf56d77 Michael Hanselmann

1397 9bf56d77 Michael Hanselmann
    """
1398 9bf56d77 Michael Hanselmann
    (idx, ) = private
1399 9bf56d77 Michael Hanselmann
1400 9bf56d77 Michael Hanselmann
    self._feedback_fn("Disk %s is now receiving data" % idx)
1401 9bf56d77 Michael Hanselmann
1402 9bf56d77 Michael Hanselmann
  def ReportFinished(self, ie, private):
1403 9bf56d77 Michael Hanselmann
    """Called when a transfer has finished.
1404 9bf56d77 Michael Hanselmann

1405 9bf56d77 Michael Hanselmann
    """
1406 9bf56d77 Michael Hanselmann
    (idx, ) = private
1407 9bf56d77 Michael Hanselmann
1408 9bf56d77 Michael Hanselmann
    # Daemon is certainly no longer listening
1409 9bf56d77 Michael Hanselmann
    self._daemon_port[idx] = None
1410 9bf56d77 Michael Hanselmann
1411 9bf56d77 Michael Hanselmann
    if ie.success:
1412 9bf56d77 Michael Hanselmann
      self._feedback_fn("Disk %s finished receiving data" % idx)
1413 9bf56d77 Michael Hanselmann
    else:
1414 9bf56d77 Michael Hanselmann
      self._feedback_fn(("Disk %s failed to receive data: %s"
1415 c9300bb3 Iustin Pop
                         " (recent output: %s)") %
1416 9bf56d77 Michael Hanselmann
                        (idx, ie.final_message, ie.recent_output))
1417 9bf56d77 Michael Hanselmann
1418 9bf56d77 Michael Hanselmann
    self._dresults[idx] = bool(ie.success)
1419 9bf56d77 Michael Hanselmann
1420 9bf56d77 Michael Hanselmann
1421 ba5619c2 Michael Hanselmann
def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1422 ba5619c2 Michael Hanselmann
                 cds, timeouts):
1423 9bf56d77 Michael Hanselmann
  """Imports an instance from another cluster.
1424 9bf56d77 Michael Hanselmann

1425 9bf56d77 Michael Hanselmann
  @param lu: Logical unit instance
1426 9bf56d77 Michael Hanselmann
  @param feedback_fn: Feedback function
1427 9bf56d77 Michael Hanselmann
  @type instance: L{objects.Instance}
1428 9bf56d77 Michael Hanselmann
  @param instance: Instance object
1429 ba5619c2 Michael Hanselmann
  @type pnode: L{objects.Node}
1430 ba5619c2 Michael Hanselmann
  @param pnode: Primary node of instance as an object
1431 9bf56d77 Michael Hanselmann
  @type source_x509_ca: OpenSSL.crypto.X509
1432 9bf56d77 Michael Hanselmann
  @param source_x509_ca: Import source's X509 CA
1433 9bf56d77 Michael Hanselmann
  @type cds: string
1434 9bf56d77 Michael Hanselmann
  @param cds: Cluster domain secret
1435 9bf56d77 Michael Hanselmann
  @type timeouts: L{ImportExportTimeouts}
1436 9bf56d77 Michael Hanselmann
  @param timeouts: Timeouts for this import
1437 9bf56d77 Michael Hanselmann

1438 9bf56d77 Michael Hanselmann
  """
1439 9bf56d77 Michael Hanselmann
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1440 9bf56d77 Michael Hanselmann
                                                  source_x509_ca)
1441 9bf56d77 Michael Hanselmann
1442 d51ae04c Michael Hanselmann
  magic_base = utils.GenerateSecret(6)
1443 d51ae04c Michael Hanselmann
1444 ba5619c2 Michael Hanselmann
  # Decide whether to use IPv6
1445 ba5619c2 Michael Hanselmann
  ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1446 ba5619c2 Michael Hanselmann
1447 9bf56d77 Michael Hanselmann
  # Create crypto key
1448 9bf56d77 Michael Hanselmann
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1449 9bf56d77 Michael Hanselmann
                                        constants.RIE_CERT_VALIDITY)
1450 9bf56d77 Michael Hanselmann
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1451 9bf56d77 Michael Hanselmann
1452 9bf56d77 Michael Hanselmann
  (x509_key_name, x509_cert_pem) = result.payload
1453 9bf56d77 Michael Hanselmann
  try:
1454 9bf56d77 Michael Hanselmann
    # Load certificate
1455 9bf56d77 Michael Hanselmann
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1456 9bf56d77 Michael Hanselmann
                                                x509_cert_pem)
1457 9bf56d77 Michael Hanselmann
1458 9bf56d77 Michael Hanselmann
    # Sign certificate
1459 9bf56d77 Michael Hanselmann
    signed_x509_cert_pem = \
1460 9bf56d77 Michael Hanselmann
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1461 9bf56d77 Michael Hanselmann
1462 9bf56d77 Michael Hanselmann
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1463 ba5619c2 Michael Hanselmann
                          len(instance.disks), pnode.primary_ip)
1464 9bf56d77 Michael Hanselmann
1465 9bf56d77 Michael Hanselmann
    ieloop = ImportExportLoop(lu)
1466 9bf56d77 Michael Hanselmann
    try:
1467 9bf56d77 Michael Hanselmann
      for idx, dev in enumerate(instance.disks):
1468 d51ae04c Michael Hanselmann
        magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1469 d51ae04c Michael Hanselmann
1470 d51ae04c Michael Hanselmann
        # Import daemon options
1471 d51ae04c Michael Hanselmann
        opts = objects.ImportExportOptions(key_name=x509_key_name,
1472 d51ae04c Michael Hanselmann
                                           ca_pem=source_ca_pem,
1473 ba5619c2 Michael Hanselmann
                                           magic=magic, ipv6=ipv6)
1474 d51ae04c Michael Hanselmann
1475 eb630f50 Michael Hanselmann
        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1476 5e26c4d9 Iustin Pop
                              "disk%d" % idx,
1477 9bf56d77 Michael Hanselmann
                              constants.IEIO_SCRIPT, (dev, idx),
1478 9bf56d77 Michael Hanselmann
                              timeouts, cbs, private=(idx, )))
1479 9bf56d77 Michael Hanselmann
1480 9bf56d77 Michael Hanselmann
      ieloop.Run()
1481 9bf56d77 Michael Hanselmann
    finally:
1482 9bf56d77 Michael Hanselmann
      ieloop.FinalizeAll()
1483 9bf56d77 Michael Hanselmann
  finally:
1484 9bf56d77 Michael Hanselmann
    # Remove crypto key and certificate
1485 9bf56d77 Michael Hanselmann
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1486 9bf56d77 Michael Hanselmann
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1487 9bf56d77 Michael Hanselmann
1488 9bf56d77 Michael Hanselmann
  return cbs.disk_results
1489 9bf56d77 Michael Hanselmann
1490 9bf56d77 Michael Hanselmann
1491 1410fa8d Michael Hanselmann
def _GetImportExportHandshakeMessage(version):
1492 1410fa8d Michael Hanselmann
  """Returns the handshake message for a RIE protocol version.
1493 1410fa8d Michael Hanselmann

1494 1410fa8d Michael Hanselmann
  @type version: number
1495 1410fa8d Michael Hanselmann

1496 1410fa8d Michael Hanselmann
  """
1497 1410fa8d Michael Hanselmann
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1498 1410fa8d Michael Hanselmann
1499 1410fa8d Michael Hanselmann
1500 1410fa8d Michael Hanselmann
def ComputeRemoteExportHandshake(cds):
1501 1410fa8d Michael Hanselmann
  """Computes the remote import/export handshake.
1502 1410fa8d Michael Hanselmann

1503 1410fa8d Michael Hanselmann
  @type cds: string
1504 1410fa8d Michael Hanselmann
  @param cds: Cluster domain secret
1505 1410fa8d Michael Hanselmann

1506 1410fa8d Michael Hanselmann
  """
1507 1410fa8d Michael Hanselmann
  salt = utils.GenerateSecret(8)
1508 1410fa8d Michael Hanselmann
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1509 1410fa8d Michael Hanselmann
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1510 1410fa8d Michael Hanselmann
1511 1410fa8d Michael Hanselmann
1512 1410fa8d Michael Hanselmann
def CheckRemoteExportHandshake(cds, handshake):
1513 1410fa8d Michael Hanselmann
  """Checks the handshake of a remote import/export.
1514 1410fa8d Michael Hanselmann

1515 1410fa8d Michael Hanselmann
  @type cds: string
1516 1410fa8d Michael Hanselmann
  @param cds: Cluster domain secret
1517 1410fa8d Michael Hanselmann
  @type handshake: sequence
1518 1410fa8d Michael Hanselmann
  @param handshake: Handshake sent by remote peer
1519 1410fa8d Michael Hanselmann

1520 1410fa8d Michael Hanselmann
  """
1521 1410fa8d Michael Hanselmann
  try:
1522 1410fa8d Michael Hanselmann
    (version, hmac_digest, hmac_salt) = handshake
1523 1410fa8d Michael Hanselmann
  except (TypeError, ValueError), err:
1524 1410fa8d Michael Hanselmann
    return "Invalid data: %s" % err
1525 1410fa8d Michael Hanselmann
1526 1410fa8d Michael Hanselmann
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1527 1410fa8d Michael Hanselmann
                              hmac_digest, salt=hmac_salt):
1528 1410fa8d Michael Hanselmann
    return "Hash didn't match, clusters don't share the same domain secret"
1529 1410fa8d Michael Hanselmann
1530 1410fa8d Michael Hanselmann
  if version != constants.RIE_VERSION:
1531 1410fa8d Michael Hanselmann
    return ("Clusters don't have the same remote import/export protocol"
1532 1410fa8d Michael Hanselmann
            " (local=%s, remote=%s)" %
1533 1410fa8d Michael Hanselmann
            (constants.RIE_VERSION, version))
1534 1410fa8d Michael Hanselmann
1535 1410fa8d Michael Hanselmann
  return None
1536 4a96f1d1 Michael Hanselmann
1537 4a96f1d1 Michael Hanselmann
1538 d51ae04c Michael Hanselmann
def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1539 4a96f1d1 Michael Hanselmann
  """Returns the hashed text for import/export disk information.
1540 4a96f1d1 Michael Hanselmann

1541 4a96f1d1 Michael Hanselmann
  @type disk_index: number
1542 4a96f1d1 Michael Hanselmann
  @param disk_index: Index of disk (included in hash)
1543 4a96f1d1 Michael Hanselmann
  @type host: string
1544 4a96f1d1 Michael Hanselmann
  @param host: Hostname
1545 4a96f1d1 Michael Hanselmann
  @type port: number
1546 4a96f1d1 Michael Hanselmann
  @param port: Daemon port
1547 d51ae04c Michael Hanselmann
  @type magic: string
1548 d51ae04c Michael Hanselmann
  @param magic: Magic value
1549 4a96f1d1 Michael Hanselmann

1550 4a96f1d1 Michael Hanselmann
  """
1551 d51ae04c Michael Hanselmann
  return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1552 4a96f1d1 Michael Hanselmann
1553 4a96f1d1 Michael Hanselmann
1554 4a96f1d1 Michael Hanselmann
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1555 4a96f1d1 Michael Hanselmann
  """Verifies received disk information for an export.
1556 4a96f1d1 Michael Hanselmann

1557 4a96f1d1 Michael Hanselmann
  @type cds: string
1558 4a96f1d1 Michael Hanselmann
  @param cds: Cluster domain secret
1559 4a96f1d1 Michael Hanselmann
  @type disk_index: number
1560 4a96f1d1 Michael Hanselmann
  @param disk_index: Index of disk (included in hash)
1561 4a96f1d1 Michael Hanselmann
  @type disk_info: sequence
1562 4a96f1d1 Michael Hanselmann
  @param disk_info: Disk information sent by remote peer
1563 4a96f1d1 Michael Hanselmann

1564 4a96f1d1 Michael Hanselmann
  """
1565 4a96f1d1 Michael Hanselmann
  try:
1566 d51ae04c Michael Hanselmann
    (host, port, magic, hmac_digest, hmac_salt) = disk_info
1567 4a96f1d1 Michael Hanselmann
  except (TypeError, ValueError), err:
1568 4a96f1d1 Michael Hanselmann
    raise errors.GenericError("Invalid data: %s" % err)
1569 4a96f1d1 Michael Hanselmann
1570 d51ae04c Michael Hanselmann
  if not (host and port and magic):
1571 d51ae04c Michael Hanselmann
    raise errors.GenericError("Missing destination host, port or magic")
1572 4a96f1d1 Michael Hanselmann
1573 d51ae04c Michael Hanselmann
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1574 4a96f1d1 Michael Hanselmann
1575 4a96f1d1 Michael Hanselmann
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1576 4a96f1d1 Michael Hanselmann
    raise errors.GenericError("HMAC is wrong")
1577 4a96f1d1 Michael Hanselmann
1578 ba5619c2 Michael Hanselmann
  if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1579 ba5619c2 Michael Hanselmann
    destination = host
1580 ba5619c2 Michael Hanselmann
  else:
1581 ba5619c2 Michael Hanselmann
    destination = netutils.Hostname.GetNormalizedName(host)
1582 ba5619c2 Michael Hanselmann
1583 ba5619c2 Michael Hanselmann
  return (destination,
1584 d51ae04c Michael Hanselmann
          utils.ValidateServiceName(port),
1585 d51ae04c Michael Hanselmann
          magic)
1586 4a96f1d1 Michael Hanselmann
1587 4a96f1d1 Michael Hanselmann
1588 d51ae04c Michael Hanselmann
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1589 4a96f1d1 Michael Hanselmann
  """Computes the signed disk information for a remote import.
1590 4a96f1d1 Michael Hanselmann

1591 4a96f1d1 Michael Hanselmann
  @type cds: string
1592 4a96f1d1 Michael Hanselmann
  @param cds: Cluster domain secret
1593 4a96f1d1 Michael Hanselmann
  @type salt: string
1594 4a96f1d1 Michael Hanselmann
  @param salt: HMAC salt
1595 4a96f1d1 Michael Hanselmann
  @type disk_index: number
1596 4a96f1d1 Michael Hanselmann
  @param disk_index: Index of disk (included in hash)
1597 4a96f1d1 Michael Hanselmann
  @type host: string
1598 4a96f1d1 Michael Hanselmann
  @param host: Hostname
1599 4a96f1d1 Michael Hanselmann
  @type port: number
1600 4a96f1d1 Michael Hanselmann
  @param port: Daemon port
1601 d51ae04c Michael Hanselmann
  @type magic: string
1602 d51ae04c Michael Hanselmann
  @param magic: Magic value
1603 4a96f1d1 Michael Hanselmann

1604 4a96f1d1 Michael Hanselmann
  """
1605 d51ae04c Michael Hanselmann
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1606 4a96f1d1 Michael Hanselmann
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1607 d51ae04c Michael Hanselmann
  return (host, port, magic, hmac_digest, salt)