root / lib / masterd / instance.py @ 5d97d6dd
History | View | Annotate | Download (25.8 kB)
1 |
#
|
---|---|
2 |
#
|
3 |
|
4 |
# Copyright (C) 2010 Google Inc.
|
5 |
#
|
6 |
# This program is free software; you can redistribute it and/or modify
|
7 |
# it under the terms of the GNU General Public License as published by
|
8 |
# the Free Software Foundation; either version 2 of the License, or
|
9 |
# (at your option) any later version.
|
10 |
#
|
11 |
# This program is distributed in the hope that it will be useful, but
|
12 |
# WITHOUT ANY WARRANTY; without even the implied warranty of
|
13 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
14 |
# General Public License for more details.
|
15 |
#
|
16 |
# You should have received a copy of the GNU General Public License
|
17 |
# along with this program; if not, write to the Free Software
|
18 |
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
19 |
# 02110-1301, USA.
|
20 |
|
21 |
|
22 |
"""Instance-related functions and classes for masterd.
|
23 |
|
24 |
"""
|
25 |
|
26 |
import logging |
27 |
import time |
28 |
|
29 |
from ganeti import constants |
30 |
from ganeti import errors |
31 |
from ganeti import compat |
32 |
|
33 |
|
34 |
class _ImportExportError(Exception): |
35 |
"""Local exception to report import/export errors.
|
36 |
|
37 |
"""
|
38 |
|
39 |
|
40 |
class ImportExportTimeouts(object): |
41 |
#: Time until daemon starts writing status file
|
42 |
DEFAULT_READY_TIMEOUT = 10
|
43 |
|
44 |
#: Length of time until errors cause hard failure
|
45 |
DEFAULT_ERROR_TIMEOUT = 10
|
46 |
|
47 |
#: Time after which daemon must be listening
|
48 |
DEFAULT_LISTEN_TIMEOUT = 10
|
49 |
|
50 |
__slots__ = [ |
51 |
"error",
|
52 |
"ready",
|
53 |
"listen",
|
54 |
"connect",
|
55 |
] |
56 |
|
57 |
def __init__(self, connect, |
58 |
listen=DEFAULT_LISTEN_TIMEOUT, |
59 |
error=DEFAULT_ERROR_TIMEOUT, |
60 |
ready=DEFAULT_READY_TIMEOUT): |
61 |
"""Initializes this class.
|
62 |
|
63 |
@type connect: number
|
64 |
@param connect: Timeout for establishing connection
|
65 |
@type listen: number
|
66 |
@param listen: Timeout for starting to listen for connections
|
67 |
@type error: number
|
68 |
@param error: Length of time until errors cause hard failure
|
69 |
@type ready: number
|
70 |
@param ready: Timeout for daemon to become ready
|
71 |
|
72 |
"""
|
73 |
self.error = error
|
74 |
self.ready = ready
|
75 |
self.listen = listen
|
76 |
self.connect = connect
|
77 |
|
78 |
|
79 |
class ImportExportCbBase(object): |
80 |
"""Callbacks for disk import/export.
|
81 |
|
82 |
"""
|
83 |
def ReportListening(self, ie, private): |
84 |
"""Called when daemon started listening.
|
85 |
|
86 |
@type ie: Subclass of L{_DiskImportExportBase}
|
87 |
@param ie: Import/export object
|
88 |
@param private: Private data passed to import/export object
|
89 |
|
90 |
"""
|
91 |
|
92 |
def ReportConnected(self, ie, private): |
93 |
"""Called when a connection has been established.
|
94 |
|
95 |
@type ie: Subclass of L{_DiskImportExportBase}
|
96 |
@param ie: Import/export object
|
97 |
@param private: Private data passed to import/export object
|
98 |
|
99 |
"""
|
100 |
|
101 |
def ReportFinished(self, ie, private): |
102 |
"""Called when a transfer has finished.
|
103 |
|
104 |
@type ie: Subclass of L{_DiskImportExportBase}
|
105 |
@param ie: Import/export object
|
106 |
@param private: Private data passed to import/export object
|
107 |
|
108 |
"""
|
109 |
|
110 |
|
111 |
def _TimeoutExpired(epoch, timeout, _time_fn=time.time): |
112 |
"""Checks whether a timeout has expired.
|
113 |
|
114 |
"""
|
115 |
return _time_fn() > (epoch + timeout)
|
116 |
|
117 |
|
118 |
class _DiskImportExportBase(object): |
119 |
MODE_TEXT = None
|
120 |
|
121 |
def __init__(self, lu, node_name, x509_key_name, remote_x509_ca, |
122 |
instance, timeouts, cbs, private=None):
|
123 |
"""Initializes this class.
|
124 |
|
125 |
@param lu: Logical unit instance
|
126 |
@type node_name: string
|
127 |
@param node_name: Node name for import
|
128 |
@type x509_key_name: string
|
129 |
@param x509_key_name: Name of X509 key (None for node daemon key)
|
130 |
@type remote_x509_ca: string
|
131 |
@param remote_x509_ca: Remote peer's CA (None for node daemon certificate)
|
132 |
@type instance: L{objects.Instance}
|
133 |
@param instance: Instance object
|
134 |
@type timeouts: L{ImportExportTimeouts}
|
135 |
@param timeouts: Timeouts for this import
|
136 |
@type cbs: L{ImportExportCbBase}
|
137 |
@param cbs: Callbacks
|
138 |
@param private: Private data for callback functions
|
139 |
|
140 |
"""
|
141 |
assert self.MODE_TEXT |
142 |
|
143 |
self._lu = lu
|
144 |
self.node_name = node_name
|
145 |
self._x509_key_name = x509_key_name
|
146 |
self._remote_x509_ca = remote_x509_ca
|
147 |
self._instance = instance
|
148 |
self._timeouts = timeouts
|
149 |
self._cbs = cbs
|
150 |
self._private = private
|
151 |
|
152 |
# Parent loop
|
153 |
self._loop = None |
154 |
|
155 |
# Timestamps
|
156 |
self._ts_begin = None |
157 |
self._ts_connected = None |
158 |
self._ts_finished = None |
159 |
self._ts_cleanup = None |
160 |
self._ts_last_error = None |
161 |
|
162 |
# Transfer status
|
163 |
self.success = None |
164 |
self.final_message = None |
165 |
|
166 |
# Daemon status
|
167 |
self._daemon_name = None |
168 |
self._daemon = None |
169 |
|
170 |
@property
|
171 |
def recent_output(self): |
172 |
"""Returns the most recent output from the daemon.
|
173 |
|
174 |
"""
|
175 |
if self._daemon: |
176 |
return self._daemon.recent_output |
177 |
|
178 |
return None |
179 |
|
180 |
@property
|
181 |
def active(self): |
182 |
"""Determines whether this transport is still active.
|
183 |
|
184 |
"""
|
185 |
return self.success is None |
186 |
|
187 |
@property
|
188 |
def loop(self): |
189 |
"""Returns parent loop.
|
190 |
|
191 |
@rtype: L{ImportExportLoop}
|
192 |
|
193 |
"""
|
194 |
return self._loop |
195 |
|
196 |
def SetLoop(self, loop): |
197 |
"""Sets the parent loop.
|
198 |
|
199 |
@type loop: L{ImportExportLoop}
|
200 |
|
201 |
"""
|
202 |
if self._loop: |
203 |
raise errors.ProgrammerError("Loop can only be set once") |
204 |
|
205 |
self._loop = loop
|
206 |
|
207 |
def _StartDaemon(self): |
208 |
"""Starts the import/export daemon.
|
209 |
|
210 |
"""
|
211 |
raise NotImplementedError() |
212 |
|
213 |
def CheckDaemon(self): |
214 |
"""Checks whether daemon has been started and if not, starts it.
|
215 |
|
216 |
@rtype: string
|
217 |
@return: Daemon name
|
218 |
|
219 |
"""
|
220 |
assert self._ts_cleanup is None |
221 |
|
222 |
if self._daemon_name is None: |
223 |
assert self._ts_begin is None |
224 |
|
225 |
result = self._StartDaemon()
|
226 |
if result.fail_msg:
|
227 |
raise _ImportExportError("Failed to start %s on %s: %s" % |
228 |
(self.MODE_TEXT, self.node_name, |
229 |
result.fail_msg)) |
230 |
|
231 |
daemon_name = result.payload |
232 |
|
233 |
logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name, |
234 |
self.node_name)
|
235 |
|
236 |
self._ts_begin = time.time()
|
237 |
self._daemon_name = daemon_name
|
238 |
|
239 |
return self._daemon_name |
240 |
|
241 |
def GetDaemonName(self): |
242 |
"""Returns the daemon name.
|
243 |
|
244 |
"""
|
245 |
assert self._daemon_name, "Daemon has not been started" |
246 |
assert self._ts_cleanup is None |
247 |
return self._daemon_name |
248 |
|
249 |
def Abort(self): |
250 |
"""Sends SIGTERM to import/export daemon (if still active).
|
251 |
|
252 |
"""
|
253 |
if self._daemon_name: |
254 |
self._lu.LogWarning("Aborting %s %r on %s", |
255 |
self.MODE_TEXT, self._daemon_name, self.node_name) |
256 |
result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name) |
257 |
if result.fail_msg:
|
258 |
self._lu.LogWarning("Failed to abort %s %r on %s: %s", |
259 |
self.MODE_TEXT, self._daemon_name, |
260 |
self.node_name, result.fail_msg)
|
261 |
return False |
262 |
|
263 |
return True |
264 |
|
265 |
def _SetDaemonData(self, data): |
266 |
"""Internal function for updating status daemon data.
|
267 |
|
268 |
@type data: L{objects.ImportExportStatus}
|
269 |
@param data: Daemon status data
|
270 |
|
271 |
"""
|
272 |
assert self._ts_begin is not None |
273 |
|
274 |
if not data: |
275 |
if _TimeoutExpired(self._ts_begin, self._timeouts.ready): |
276 |
raise _ImportExportError("Didn't become ready after %s seconds" % |
277 |
self._timeouts.ready)
|
278 |
|
279 |
return False |
280 |
|
281 |
self._daemon = data
|
282 |
|
283 |
return True |
284 |
|
285 |
def SetDaemonData(self, success, data): |
286 |
"""Updates daemon status data.
|
287 |
|
288 |
@type success: bool
|
289 |
@param success: Whether fetching data was successful or not
|
290 |
@type data: L{objects.ImportExportStatus}
|
291 |
@param data: Daemon status data
|
292 |
|
293 |
"""
|
294 |
if not success: |
295 |
if self._ts_last_error is None: |
296 |
self._ts_last_error = time.time()
|
297 |
|
298 |
elif _TimeoutExpired(self._ts_last_error, self._timeouts.error): |
299 |
raise _ImportExportError("Too many errors while updating data") |
300 |
|
301 |
return False |
302 |
|
303 |
self._ts_last_error = None |
304 |
|
305 |
return self._SetDaemonData(data) |
306 |
|
307 |
def CheckListening(self): |
308 |
"""Checks whether the daemon is listening.
|
309 |
|
310 |
"""
|
311 |
raise NotImplementedError() |
312 |
|
313 |
def _GetConnectedCheckEpoch(self): |
314 |
"""Returns timeout to calculate connect timeout.
|
315 |
|
316 |
"""
|
317 |
raise NotImplementedError() |
318 |
|
319 |
def CheckConnected(self): |
320 |
"""Checks whether the daemon is connected.
|
321 |
|
322 |
@rtype: bool
|
323 |
@return: Whether the daemon is connected
|
324 |
|
325 |
"""
|
326 |
assert self._daemon, "Daemon status missing" |
327 |
|
328 |
if self._ts_connected is not None: |
329 |
return True |
330 |
|
331 |
if self._daemon.connected: |
332 |
self._ts_connected = time.time()
|
333 |
|
334 |
# TODO: Log remote peer
|
335 |
logging.debug("%s %r on %s is now connected",
|
336 |
self.MODE_TEXT, self._daemon_name, self.node_name) |
337 |
|
338 |
self._cbs.ReportConnected(self, self._private) |
339 |
|
340 |
return True |
341 |
|
342 |
if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect): |
343 |
raise _ImportExportError("Not connected after %s seconds" % |
344 |
self._timeouts.connect)
|
345 |
|
346 |
return False |
347 |
|
348 |
def CheckFinished(self): |
349 |
"""Checks whether the daemon exited.
|
350 |
|
351 |
@rtype: bool
|
352 |
@return: Whether the transfer is finished
|
353 |
|
354 |
"""
|
355 |
assert self._daemon, "Daemon status missing" |
356 |
|
357 |
if self._ts_finished: |
358 |
return True |
359 |
|
360 |
if self._daemon.exit_status is None: |
361 |
return False |
362 |
|
363 |
self._ts_finished = time.time()
|
364 |
|
365 |
self._ReportFinished(self._daemon.exit_status == 0, |
366 |
self._daemon.error_message)
|
367 |
|
368 |
return True |
369 |
|
370 |
def _ReportFinished(self, success, message): |
371 |
"""Transfer is finished or daemon exited.
|
372 |
|
373 |
@type success: bool
|
374 |
@param success: Whether the transfer was successful
|
375 |
@type message: string
|
376 |
@param message: Error message
|
377 |
|
378 |
"""
|
379 |
assert self.success is None |
380 |
|
381 |
self.success = success
|
382 |
self.final_message = message
|
383 |
|
384 |
if success:
|
385 |
logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name, |
386 |
self.node_name)
|
387 |
elif self._daemon_name: |
388 |
self._lu.LogWarning("%s %r on %s failed: %s", |
389 |
self.MODE_TEXT, self._daemon_name, self.node_name, |
390 |
message) |
391 |
else:
|
392 |
self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT, |
393 |
self.node_name, message)
|
394 |
|
395 |
self._cbs.ReportFinished(self, self._private) |
396 |
|
397 |
def _Finalize(self): |
398 |
"""Makes the RPC call to finalize this import/export.
|
399 |
|
400 |
"""
|
401 |
return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name) |
402 |
|
403 |
def Finalize(self, error=None): |
404 |
"""Finalizes this import/export.
|
405 |
|
406 |
"""
|
407 |
assert error or self.success is not None |
408 |
|
409 |
if self._daemon_name: |
410 |
logging.info("Finalizing %s %r on %s",
|
411 |
self.MODE_TEXT, self._daemon_name, self.node_name) |
412 |
|
413 |
result = self._Finalize()
|
414 |
if result.fail_msg:
|
415 |
self._lu.LogWarning("Failed to finalize %s %r on %s: %s", |
416 |
self.MODE_TEXT, self._daemon_name, |
417 |
self.node_name, result.fail_msg)
|
418 |
return False |
419 |
|
420 |
# Daemon is no longer running
|
421 |
self._daemon_name = None |
422 |
self._ts_cleanup = time.time()
|
423 |
|
424 |
if error:
|
425 |
self._ReportFinished(False, error) |
426 |
|
427 |
return True |
428 |
|
429 |
|
430 |
class DiskImport(_DiskImportExportBase): |
431 |
MODE_TEXT = "import"
|
432 |
|
433 |
def __init__(self, lu, node_name, x509_key_name, source_x509_ca, instance, |
434 |
dest, dest_args, timeouts, cbs, private=None):
|
435 |
"""Initializes this class.
|
436 |
|
437 |
@param lu: Logical unit instance
|
438 |
@type node_name: string
|
439 |
@param node_name: Node name for import
|
440 |
@type x509_key_name: string
|
441 |
@param x509_key_name: Name of X509 key (None for node daemon key)
|
442 |
@type source_x509_ca: string
|
443 |
@param source_x509_ca: Remote peer's CA (None for node daemon certificate)
|
444 |
@type instance: L{objects.Instance}
|
445 |
@param instance: Instance object
|
446 |
@param dest: I/O destination
|
447 |
@param dest_args: I/O arguments
|
448 |
@type timeouts: L{ImportExportTimeouts}
|
449 |
@param timeouts: Timeouts for this import
|
450 |
@type cbs: L{ImportExportCbBase}
|
451 |
@param cbs: Callbacks
|
452 |
@param private: Private data for callback functions
|
453 |
|
454 |
"""
|
455 |
_DiskImportExportBase.__init__(self, lu, node_name,
|
456 |
x509_key_name, source_x509_ca, |
457 |
instance, timeouts, cbs, private) |
458 |
self._dest = dest
|
459 |
self._dest_args = dest_args
|
460 |
|
461 |
# Timestamps
|
462 |
self._ts_listening = None |
463 |
|
464 |
@property
|
465 |
def listen_port(self): |
466 |
"""Returns the port the daemon is listening on.
|
467 |
|
468 |
"""
|
469 |
if self._daemon: |
470 |
return self._daemon.listen_port |
471 |
|
472 |
return None |
473 |
|
474 |
def _StartDaemon(self): |
475 |
"""Starts the import daemon.
|
476 |
|
477 |
"""
|
478 |
return self._lu.rpc.call_import_start(self.node_name, |
479 |
self._x509_key_name,
|
480 |
self._remote_x509_ca, self._instance, |
481 |
self._dest, self._dest_args) |
482 |
|
483 |
def CheckListening(self): |
484 |
"""Checks whether the daemon is listening.
|
485 |
|
486 |
@rtype: bool
|
487 |
@return: Whether the daemon is listening
|
488 |
|
489 |
"""
|
490 |
assert self._daemon, "Daemon status missing" |
491 |
|
492 |
if self._ts_listening is not None: |
493 |
return True |
494 |
|
495 |
port = self._daemon.listen_port
|
496 |
if port is not None: |
497 |
self._ts_listening = time.time()
|
498 |
|
499 |
logging.debug("Import %r on %s is now listening on port %s",
|
500 |
self._daemon_name, self.node_name, port) |
501 |
|
502 |
self._cbs.ReportListening(self, self._private) |
503 |
|
504 |
return True |
505 |
|
506 |
if _TimeoutExpired(self._ts_begin, self._timeouts.listen): |
507 |
raise _ImportExportError("Not listening after %s seconds" % |
508 |
self._timeouts.listen)
|
509 |
|
510 |
return False |
511 |
|
512 |
def _GetConnectedCheckEpoch(self): |
513 |
"""Returns the time since we started listening.
|
514 |
|
515 |
"""
|
516 |
assert self._ts_listening is not None, \ |
517 |
("Checking whether an import is connected is only useful"
|
518 |
" once it's been listening")
|
519 |
|
520 |
return self._ts_listening |
521 |
|
522 |
|
523 |
class DiskExport(_DiskImportExportBase): |
524 |
MODE_TEXT = "export"
|
525 |
|
526 |
def __init__(self, lu, node_name, x509_key_name, dest_x509_ca, |
527 |
dest_host, dest_port, instance, source, source_args, |
528 |
timeouts, cbs, private=None):
|
529 |
"""Initializes this class.
|
530 |
|
531 |
@param lu: Logical unit instance
|
532 |
@type node_name: string
|
533 |
@param node_name: Node name for import
|
534 |
@type x509_key_name: string
|
535 |
@param x509_key_name: Name of X509 key (None for node daemon key)
|
536 |
@type dest_x509_ca: string
|
537 |
@param dest_x509_ca: Remote peer's CA (None for node daemon certificate)
|
538 |
@type dest_host: string
|
539 |
@param dest_host: Destination host name or IP address
|
540 |
@type dest_port: number
|
541 |
@param dest_port: Destination port number
|
542 |
@type instance: L{objects.Instance}
|
543 |
@param instance: Instance object
|
544 |
@param source: I/O source
|
545 |
@param source_args: I/O source
|
546 |
@type timeouts: L{ImportExportTimeouts}
|
547 |
@param timeouts: Timeouts for this import
|
548 |
@type cbs: L{ImportExportCbBase}
|
549 |
@param cbs: Callbacks
|
550 |
@param private: Private data for callback functions
|
551 |
|
552 |
"""
|
553 |
_DiskImportExportBase.__init__(self, lu, node_name,
|
554 |
x509_key_name, dest_x509_ca, |
555 |
instance, timeouts, cbs, private) |
556 |
self._dest_host = dest_host
|
557 |
self._dest_port = dest_port
|
558 |
self._source = source
|
559 |
self._source_args = source_args
|
560 |
|
561 |
def _StartDaemon(self): |
562 |
"""Starts the export daemon.
|
563 |
|
564 |
"""
|
565 |
return self._lu.rpc.call_export_start(self.node_name, self._x509_key_name, |
566 |
self._remote_x509_ca,
|
567 |
self._dest_host, self._dest_port, |
568 |
self._instance, self._source, |
569 |
self._source_args)
|
570 |
|
571 |
def CheckListening(self): |
572 |
"""Checks whether the daemon is listening.
|
573 |
|
574 |
"""
|
575 |
# Only an import can be listening
|
576 |
return True |
577 |
|
578 |
def _GetConnectedCheckEpoch(self): |
579 |
"""Returns the time since the daemon started.
|
580 |
|
581 |
"""
|
582 |
assert self._ts_begin is not None |
583 |
|
584 |
return self._ts_begin |
585 |
|
586 |
|
587 |
class ImportExportLoop: |
588 |
MIN_DELAY = 1.0
|
589 |
MAX_DELAY = 20.0
|
590 |
|
591 |
def __init__(self, lu): |
592 |
"""Initializes this class.
|
593 |
|
594 |
"""
|
595 |
self._lu = lu
|
596 |
self._queue = []
|
597 |
self._pending_add = []
|
598 |
|
599 |
def Add(self, diskie): |
600 |
"""Adds an import/export object to the loop.
|
601 |
|
602 |
@type diskie: Subclass of L{_DiskImportExportBase}
|
603 |
@param diskie: Import/export object
|
604 |
|
605 |
"""
|
606 |
assert diskie not in self._pending_add |
607 |
assert diskie.loop is None |
608 |
|
609 |
diskie.SetLoop(self)
|
610 |
|
611 |
# Adding new objects to a staging list is necessary, otherwise the main
|
612 |
# loop gets confused if callbacks modify the queue while the main loop is
|
613 |
# iterating over it.
|
614 |
self._pending_add.append(diskie)
|
615 |
|
616 |
@staticmethod
|
617 |
def _CollectDaemonStatus(lu, daemons): |
618 |
"""Collects the status for all import/export daemons.
|
619 |
|
620 |
"""
|
621 |
daemon_status = {} |
622 |
|
623 |
for node_name, names in daemons.iteritems(): |
624 |
result = lu.rpc.call_impexp_status(node_name, names) |
625 |
if result.fail_msg:
|
626 |
lu.LogWarning("Failed to get daemon status on %s: %s",
|
627 |
node_name, result.fail_msg) |
628 |
continue
|
629 |
|
630 |
assert len(names) == len(result.payload) |
631 |
|
632 |
daemon_status[node_name] = dict(zip(names, result.payload)) |
633 |
|
634 |
return daemon_status
|
635 |
|
636 |
@staticmethod
|
637 |
def _GetActiveDaemonNames(queue): |
638 |
"""Gets the names of all active daemons.
|
639 |
|
640 |
"""
|
641 |
result = {} |
642 |
for diskie in queue: |
643 |
if not diskie.active: |
644 |
continue
|
645 |
|
646 |
try:
|
647 |
# Start daemon if necessary
|
648 |
daemon_name = diskie.CheckDaemon() |
649 |
except _ImportExportError, err:
|
650 |
logging.exception("%s failed", diskie.MODE_TEXT)
|
651 |
diskie.Finalize(error=str(err))
|
652 |
continue
|
653 |
|
654 |
result.setdefault(diskie.node_name, []).append(daemon_name) |
655 |
|
656 |
assert len(queue) >= len(result) |
657 |
assert len(queue) >= sum([len(names) for names in result.itervalues()]) |
658 |
|
659 |
logging.debug("daemons=%r", result)
|
660 |
|
661 |
return result
|
662 |
|
663 |
def _AddPendingToQueue(self): |
664 |
"""Adds all pending import/export objects to the internal queue.
|
665 |
|
666 |
"""
|
667 |
assert compat.all(diskie not in self._queue and diskie.loop == self |
668 |
for diskie in self._pending_add) |
669 |
|
670 |
self._queue.extend(self._pending_add) |
671 |
|
672 |
del self._pending_add[:] |
673 |
|
674 |
def Run(self): |
675 |
"""Utility main loop.
|
676 |
|
677 |
"""
|
678 |
while True: |
679 |
self._AddPendingToQueue()
|
680 |
|
681 |
# Collect all active daemon names
|
682 |
daemons = self._GetActiveDaemonNames(self._queue) |
683 |
if not daemons: |
684 |
break
|
685 |
|
686 |
# Collection daemon status data
|
687 |
data = self._CollectDaemonStatus(self._lu, daemons) |
688 |
|
689 |
# Use data
|
690 |
delay = self.MAX_DELAY
|
691 |
for diskie in self._queue: |
692 |
if not diskie.active: |
693 |
continue
|
694 |
|
695 |
try:
|
696 |
try:
|
697 |
all_daemon_data = data[diskie.node_name] |
698 |
except KeyError: |
699 |
result = diskie.SetDaemonData(False, None) |
700 |
else:
|
701 |
result = \ |
702 |
diskie.SetDaemonData(True,
|
703 |
all_daemon_data[diskie.GetDaemonName()]) |
704 |
|
705 |
if not result: |
706 |
# Daemon not yet ready, retry soon
|
707 |
delay = min(3.0, delay) |
708 |
continue
|
709 |
|
710 |
if diskie.CheckFinished():
|
711 |
# Transfer finished
|
712 |
diskie.Finalize() |
713 |
continue
|
714 |
|
715 |
# Normal case: check again in 5 seconds
|
716 |
delay = min(5.0, delay) |
717 |
|
718 |
if not diskie.CheckListening(): |
719 |
# Not yet listening, retry soon
|
720 |
delay = min(1.0, delay) |
721 |
continue
|
722 |
|
723 |
if not diskie.CheckConnected(): |
724 |
# Not yet connected, retry soon
|
725 |
delay = min(1.0, delay) |
726 |
continue
|
727 |
|
728 |
except _ImportExportError, err:
|
729 |
logging.exception("%s failed", diskie.MODE_TEXT)
|
730 |
diskie.Finalize(error=str(err))
|
731 |
|
732 |
if not compat.any([diskie.active for diskie in self._queue]): |
733 |
break
|
734 |
|
735 |
# Wait a bit
|
736 |
delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay)) |
737 |
logging.debug("Waiting for %ss", delay)
|
738 |
time.sleep(delay) |
739 |
|
740 |
def FinalizeAll(self): |
741 |
"""Finalizes all pending transfers.
|
742 |
|
743 |
"""
|
744 |
success = True
|
745 |
|
746 |
for diskie in self._queue: |
747 |
success = diskie.Finalize() and success
|
748 |
|
749 |
return success
|
750 |
|
751 |
|
752 |
class _TransferInstCbBase(ImportExportCbBase): |
753 |
def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs, |
754 |
dest_node, dest_ip): |
755 |
"""Initializes this class.
|
756 |
|
757 |
"""
|
758 |
ImportExportCbBase.__init__(self)
|
759 |
|
760 |
self.lu = lu
|
761 |
self.feedback_fn = feedback_fn
|
762 |
self.instance = instance
|
763 |
self.timeouts = timeouts
|
764 |
self.src_node = src_node
|
765 |
self.src_cbs = src_cbs
|
766 |
self.dest_node = dest_node
|
767 |
self.dest_ip = dest_ip
|
768 |
|
769 |
|
770 |
class _TransferInstSourceCb(_TransferInstCbBase): |
771 |
def ReportConnected(self, ie, dtp): |
772 |
"""Called when a connection has been established.
|
773 |
|
774 |
"""
|
775 |
assert self.src_cbs is None |
776 |
assert dtp.src_export == ie
|
777 |
assert dtp.dest_import
|
778 |
|
779 |
self.feedback_fn("%s is sending data on %s" % |
780 |
(dtp.data.name, ie.node_name)) |
781 |
|
782 |
def ReportFinished(self, ie, dtp): |
783 |
"""Called when a transfer has finished.
|
784 |
|
785 |
"""
|
786 |
assert self.src_cbs is None |
787 |
assert dtp.src_export == ie
|
788 |
assert dtp.dest_import
|
789 |
|
790 |
if ie.success:
|
791 |
self.feedback_fn("%s finished sending data" % dtp.data.name) |
792 |
else:
|
793 |
self.feedback_fn("%s failed to send data: %s (recent output: %r)" % |
794 |
(dtp.data.name, ie.final_message, ie.recent_output)) |
795 |
|
796 |
dtp.RecordResult(ie.success) |
797 |
|
798 |
cb = dtp.data.finished_fn |
799 |
if cb:
|
800 |
cb() |
801 |
|
802 |
# TODO: Check whether sending SIGTERM right away is okay, maybe we should
|
803 |
# give the daemon a moment to sort things out
|
804 |
if dtp.dest_import and not ie.success: |
805 |
dtp.dest_import.Abort() |
806 |
|
807 |
|
808 |
class _TransferInstDestCb(_TransferInstCbBase): |
809 |
def ReportListening(self, ie, dtp): |
810 |
"""Called when daemon started listening.
|
811 |
|
812 |
"""
|
813 |
assert self.src_cbs |
814 |
assert dtp.src_export is None |
815 |
assert dtp.dest_import
|
816 |
|
817 |
self.feedback_fn("%s is now listening, starting export" % dtp.data.name) |
818 |
|
819 |
# Start export on source node
|
820 |
de = DiskExport(self.lu, self.src_node, None, None, self.dest_ip, |
821 |
ie.listen_port, self.instance,
|
822 |
dtp.data.src_io, dtp.data.src_ioargs, |
823 |
self.timeouts, self.src_cbs, private=dtp) |
824 |
ie.loop.Add(de) |
825 |
|
826 |
dtp.src_export = de |
827 |
|
828 |
def ReportConnected(self, ie, dtp): |
829 |
"""Called when a connection has been established.
|
830 |
|
831 |
"""
|
832 |
self.feedback_fn("%s is receiving data on %s" % |
833 |
(dtp.data.name, self.dest_node))
|
834 |
|
835 |
def ReportFinished(self, ie, dtp): |
836 |
"""Called when a transfer has finished.
|
837 |
|
838 |
"""
|
839 |
if ie.success:
|
840 |
self.feedback_fn("%s finished receiving data" % dtp.data.name) |
841 |
else:
|
842 |
self.feedback_fn("%s failed to receive data: %s (recent output: %r)" % |
843 |
(dtp.data.name, ie.final_message, ie.recent_output)) |
844 |
|
845 |
dtp.RecordResult(ie.success) |
846 |
|
847 |
# TODO: Check whether sending SIGTERM right away is okay, maybe we should
|
848 |
# give the daemon a moment to sort things out
|
849 |
if dtp.src_export and not ie.success: |
850 |
dtp.src_export.Abort() |
851 |
|
852 |
|
853 |
class DiskTransfer(object): |
854 |
def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs, |
855 |
finished_fn): |
856 |
"""Initializes this class.
|
857 |
|
858 |
@type name: string
|
859 |
@param name: User-visible name for this transfer (e.g. "disk/0")
|
860 |
@param src_io: Source I/O type
|
861 |
@param src_ioargs: Source I/O arguments
|
862 |
@param dest_io: Destination I/O type
|
863 |
@param dest_ioargs: Destination I/O arguments
|
864 |
@type finished_fn: callable
|
865 |
@param finished_fn: Function called once transfer has finished
|
866 |
|
867 |
"""
|
868 |
self.name = name
|
869 |
|
870 |
self.src_io = src_io
|
871 |
self.src_ioargs = src_ioargs
|
872 |
|
873 |
self.dest_io = dest_io
|
874 |
self.dest_ioargs = dest_ioargs
|
875 |
|
876 |
self.finished_fn = finished_fn
|
877 |
|
878 |
|
879 |
class _DiskTransferPrivate(object): |
880 |
def __init__(self, data, success): |
881 |
"""Initializes this class.
|
882 |
|
883 |
@type data: L{DiskTransfer}
|
884 |
@type success: bool
|
885 |
|
886 |
"""
|
887 |
self.data = data
|
888 |
|
889 |
self.src_export = None |
890 |
self.dest_import = None |
891 |
|
892 |
self.success = success
|
893 |
|
894 |
def RecordResult(self, success): |
895 |
"""Updates the status.
|
896 |
|
897 |
One failed part will cause the whole transfer to fail.
|
898 |
|
899 |
"""
|
900 |
self.success = self.success and success |
901 |
|
902 |
|
903 |
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip, |
904 |
instance, all_transfers): |
905 |
"""Transfers an instance's data from one node to another.
|
906 |
|
907 |
@param lu: Logical unit instance
|
908 |
@param feedback_fn: Feedback function
|
909 |
@type src_node: string
|
910 |
@param src_node: Source node name
|
911 |
@type dest_node: string
|
912 |
@param dest_node: Destination node name
|
913 |
@type dest_ip: string
|
914 |
@param dest_ip: IP address of destination node
|
915 |
@type instance: L{objects.Instance}
|
916 |
@param instance: Instance object
|
917 |
@type all_transfers: list of L{DiskTransfer} instances
|
918 |
@param all_transfers: List of all disk transfers to be made
|
919 |
@rtype: list
|
920 |
@return: List with a boolean (True=successful, False=failed) for success for
|
921 |
each transfer
|
922 |
|
923 |
"""
|
924 |
timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT) |
925 |
src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts, |
926 |
src_node, None, dest_node, dest_ip)
|
927 |
dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts, |
928 |
src_node, src_cbs, dest_node, dest_ip) |
929 |
|
930 |
all_dtp = [] |
931 |
|
932 |
ieloop = ImportExportLoop(lu) |
933 |
try:
|
934 |
for transfer in all_transfers: |
935 |
if transfer:
|
936 |
feedback_fn("Exporting %s from %s to %s" %
|
937 |
(transfer.name, src_node, dest_node)) |
938 |
|
939 |
dtp = _DiskTransferPrivate(transfer, True)
|
940 |
|
941 |
di = DiskImport(lu, dest_node, None, None, instance, |
942 |
transfer.dest_io, transfer.dest_ioargs, |
943 |
timeouts, dest_cbs, private=dtp) |
944 |
ieloop.Add(di) |
945 |
|
946 |
dtp.dest_import = di |
947 |
else:
|
948 |
dtp = _DiskTransferPrivate(None, False) |
949 |
|
950 |
all_dtp.append(dtp) |
951 |
|
952 |
ieloop.Run() |
953 |
finally:
|
954 |
ieloop.FinalizeAll() |
955 |
|
956 |
assert len(all_dtp) == len(all_transfers) |
957 |
assert compat.all([(dtp.src_export is None or |
958 |
dtp.src_export.success is not None) and |
959 |
(dtp.dest_import is None or |
960 |
dtp.dest_import.success is not None) |
961 |
for dtp in all_dtp]), \ |
962 |
"Not all imports/exports are finalized"
|
963 |
|
964 |
return [bool(dtp.success) for dtp in all_dtp] |