Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ 1410fa8d

History | View | Annotate | Download (31.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Instance-related functions and classes for masterd.
23

24
"""
25

    
26
import logging
27
import time
28

    
29
from ganeti import constants
30
from ganeti import errors
31
from ganeti import compat
32
from ganeti import utils
33
from ganeti import objects
34

    
35

    
36
class _ImportExportError(Exception):
37
  """Local exception to report import/export errors.
38

39
  """
40

    
41

    
42
class ImportExportTimeouts(object):
43
  #: Time until daemon starts writing status file
44
  DEFAULT_READY_TIMEOUT = 10
45

    
46
  #: Length of time until errors cause hard failure
47
  DEFAULT_ERROR_TIMEOUT = 10
48

    
49
  #: Time after which daemon must be listening
50
  DEFAULT_LISTEN_TIMEOUT = 10
51

    
52
  __slots__ = [
53
    "error",
54
    "ready",
55
    "listen",
56
    "connect",
57
    ]
58

    
59
  def __init__(self, connect,
60
               listen=DEFAULT_LISTEN_TIMEOUT,
61
               error=DEFAULT_ERROR_TIMEOUT,
62
               ready=DEFAULT_READY_TIMEOUT):
63
    """Initializes this class.
64

65
    @type connect: number
66
    @param connect: Timeout for establishing connection
67
    @type listen: number
68
    @param listen: Timeout for starting to listen for connections
69
    @type error: number
70
    @param error: Length of time until errors cause hard failure
71
    @type ready: number
72
    @param ready: Timeout for daemon to become ready
73

74
    """
75
    self.error = error
76
    self.ready = ready
77
    self.listen = listen
78
    self.connect = connect
79

    
80

    
81
class ImportExportCbBase(object):
82
  """Callbacks for disk import/export.
83

84
  """
85
  def ReportListening(self, ie, private):
86
    """Called when daemon started listening.
87

88
    @type ie: Subclass of L{_DiskImportExportBase}
89
    @param ie: Import/export object
90
    @param private: Private data passed to import/export object
91

92
    """
93

    
94
  def ReportConnected(self, ie, private):
95
    """Called when a connection has been established.
96

97
    @type ie: Subclass of L{_DiskImportExportBase}
98
    @param ie: Import/export object
99
    @param private: Private data passed to import/export object
100

101
    """
102

    
103
  def ReportFinished(self, ie, private):
104
    """Called when a transfer has finished.
105

106
    @type ie: Subclass of L{_DiskImportExportBase}
107
    @param ie: Import/export object
108
    @param private: Private data passed to import/export object
109

110
    """
111

    
112

    
113
def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
114
  """Checks whether a timeout has expired.
115

116
  """
117
  return _time_fn() > (epoch + timeout)
118

    
119

    
120
class _DiskImportExportBase(object):
121
  MODE_TEXT = None
122

    
123
  def __init__(self, lu, node_name, x509_key_name, remote_x509_ca,
124
               instance, timeouts, cbs, private=None):
125
    """Initializes this class.
126

127
    @param lu: Logical unit instance
128
    @type node_name: string
129
    @param node_name: Node name for import
130
    @type x509_key_name: string
131
    @param x509_key_name: Name of X509 key (None for node daemon key)
132
    @type remote_x509_ca: string
133
    @param remote_x509_ca: Remote peer's CA (None for node daemon certificate)
134
    @type instance: L{objects.Instance}
135
    @param instance: Instance object
136
    @type timeouts: L{ImportExportTimeouts}
137
    @param timeouts: Timeouts for this import
138
    @type cbs: L{ImportExportCbBase}
139
    @param cbs: Callbacks
140
    @param private: Private data for callback functions
141

142
    """
143
    assert self.MODE_TEXT
144

    
145
    self._lu = lu
146
    self.node_name = node_name
147
    self._x509_key_name = x509_key_name
148
    self._remote_x509_ca = remote_x509_ca
149
    self._instance = instance
150
    self._timeouts = timeouts
151
    self._cbs = cbs
152
    self._private = private
153

    
154
    # Parent loop
155
    self._loop = None
156

    
157
    # Timestamps
158
    self._ts_begin = None
159
    self._ts_connected = None
160
    self._ts_finished = None
161
    self._ts_cleanup = None
162
    self._ts_last_error = None
163

    
164
    # Transfer status
165
    self.success = None
166
    self.final_message = None
167

    
168
    # Daemon status
169
    self._daemon_name = None
170
    self._daemon = None
171

    
172
  @property
173
  def recent_output(self):
174
    """Returns the most recent output from the daemon.
175

176
    """
177
    if self._daemon:
178
      return self._daemon.recent_output
179

    
180
    return None
181

    
182
  @property
183
  def active(self):
184
    """Determines whether this transport is still active.
185

186
    """
187
    return self.success is None
188

    
189
  @property
190
  def loop(self):
191
    """Returns parent loop.
192

193
    @rtype: L{ImportExportLoop}
194

195
    """
196
    return self._loop
197

    
198
  def SetLoop(self, loop):
199
    """Sets the parent loop.
200

201
    @type loop: L{ImportExportLoop}
202

203
    """
204
    if self._loop:
205
      raise errors.ProgrammerError("Loop can only be set once")
206

    
207
    self._loop = loop
208

    
209
  def _StartDaemon(self):
210
    """Starts the import/export daemon.
211

212
    """
213
    raise NotImplementedError()
214

    
215
  def CheckDaemon(self):
216
    """Checks whether daemon has been started and if not, starts it.
217

218
    @rtype: string
219
    @return: Daemon name
220

221
    """
222
    assert self._ts_cleanup is None
223

    
224
    if self._daemon_name is None:
225
      assert self._ts_begin is None
226

    
227
      result = self._StartDaemon()
228
      if result.fail_msg:
229
        raise _ImportExportError("Failed to start %s on %s: %s" %
230
                                 (self.MODE_TEXT, self.node_name,
231
                                  result.fail_msg))
232

    
233
      daemon_name = result.payload
234

    
235
      logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name,
236
                   self.node_name)
237

    
238
      self._ts_begin = time.time()
239
      self._daemon_name = daemon_name
240

    
241
    return self._daemon_name
242

    
243
  def GetDaemonName(self):
244
    """Returns the daemon name.
245

246
    """
247
    assert self._daemon_name, "Daemon has not been started"
248
    assert self._ts_cleanup is None
249
    return self._daemon_name
250

    
251
  def Abort(self):
252
    """Sends SIGTERM to import/export daemon (if still active).
253

254
    """
255
    if self._daemon_name:
256
      self._lu.LogWarning("Aborting %s %r on %s",
257
                          self.MODE_TEXT, self._daemon_name, self.node_name)
258
      result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
259
      if result.fail_msg:
260
        self._lu.LogWarning("Failed to abort %s %r on %s: %s",
261
                            self.MODE_TEXT, self._daemon_name,
262
                            self.node_name, result.fail_msg)
263
        return False
264

    
265
    return True
266

    
267
  def _SetDaemonData(self, data):
268
    """Internal function for updating status daemon data.
269

270
    @type data: L{objects.ImportExportStatus}
271
    @param data: Daemon status data
272

273
    """
274
    assert self._ts_begin is not None
275

    
276
    if not data:
277
      if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
278
        raise _ImportExportError("Didn't become ready after %s seconds" %
279
                                 self._timeouts.ready)
280

    
281
      return False
282

    
283
    self._daemon = data
284

    
285
    return True
286

    
287
  def SetDaemonData(self, success, data):
288
    """Updates daemon status data.
289

290
    @type success: bool
291
    @param success: Whether fetching data was successful or not
292
    @type data: L{objects.ImportExportStatus}
293
    @param data: Daemon status data
294

295
    """
296
    if not success:
297
      if self._ts_last_error is None:
298
        self._ts_last_error = time.time()
299

    
300
      elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
301
        raise _ImportExportError("Too many errors while updating data")
302

    
303
      return False
304

    
305
    self._ts_last_error = None
306

    
307
    return self._SetDaemonData(data)
308

    
309
  def CheckListening(self):
310
    """Checks whether the daemon is listening.
311

312
    """
313
    raise NotImplementedError()
314

    
315
  def _GetConnectedCheckEpoch(self):
316
    """Returns timeout to calculate connect timeout.
317

318
    """
319
    raise NotImplementedError()
320

    
321
  def CheckConnected(self):
322
    """Checks whether the daemon is connected.
323

324
    @rtype: bool
325
    @return: Whether the daemon is connected
326

327
    """
328
    assert self._daemon, "Daemon status missing"
329

    
330
    if self._ts_connected is not None:
331
      return True
332

    
333
    if self._daemon.connected:
334
      self._ts_connected = time.time()
335

    
336
      # TODO: Log remote peer
337
      logging.debug("%s %r on %s is now connected",
338
                    self.MODE_TEXT, self._daemon_name, self.node_name)
339

    
340
      self._cbs.ReportConnected(self, self._private)
341

    
342
      return True
343

    
344
    if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
345
      raise _ImportExportError("Not connected after %s seconds" %
346
                               self._timeouts.connect)
347

    
348
    return False
349

    
350
  def CheckFinished(self):
351
    """Checks whether the daemon exited.
352

353
    @rtype: bool
354
    @return: Whether the transfer is finished
355

356
    """
357
    assert self._daemon, "Daemon status missing"
358

    
359
    if self._ts_finished:
360
      return True
361

    
362
    if self._daemon.exit_status is None:
363
      return False
364

    
365
    self._ts_finished = time.time()
366

    
367
    self._ReportFinished(self._daemon.exit_status == 0,
368
                         self._daemon.error_message)
369

    
370
    return True
371

    
372
  def _ReportFinished(self, success, message):
373
    """Transfer is finished or daemon exited.
374

375
    @type success: bool
376
    @param success: Whether the transfer was successful
377
    @type message: string
378
    @param message: Error message
379

380
    """
381
    assert self.success is None
382

    
383
    self.success = success
384
    self.final_message = message
385

    
386
    if success:
387
      logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name,
388
                   self.node_name)
389
    elif self._daemon_name:
390
      self._lu.LogWarning("%s %r on %s failed: %s",
391
                          self.MODE_TEXT, self._daemon_name, self.node_name,
392
                          message)
393
    else:
394
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
395
                          self.node_name, message)
396

    
397
    self._cbs.ReportFinished(self, self._private)
398

    
399
  def _Finalize(self):
400
    """Makes the RPC call to finalize this import/export.
401

402
    """
403
    return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
404

    
405
  def Finalize(self, error=None):
406
    """Finalizes this import/export.
407

408
    """
409
    assert error or self.success is not None
410

    
411
    if self._daemon_name:
412
      logging.info("Finalizing %s %r on %s",
413
                   self.MODE_TEXT, self._daemon_name, self.node_name)
414

    
415
      result = self._Finalize()
416
      if result.fail_msg:
417
        self._lu.LogWarning("Failed to finalize %s %r on %s: %s",
418
                            self.MODE_TEXT, self._daemon_name,
419
                            self.node_name, result.fail_msg)
420
        return False
421

    
422
      # Daemon is no longer running
423
      self._daemon_name = None
424
      self._ts_cleanup = time.time()
425

    
426
    if error:
427
      self._ReportFinished(False, error)
428

    
429
    return True
430

    
431

    
432
class DiskImport(_DiskImportExportBase):
433
  MODE_TEXT = "import"
434

    
435
  def __init__(self, lu, node_name, x509_key_name, source_x509_ca, instance,
436
               dest, dest_args, timeouts, cbs, private=None):
437
    """Initializes this class.
438

439
    @param lu: Logical unit instance
440
    @type node_name: string
441
    @param node_name: Node name for import
442
    @type x509_key_name: string
443
    @param x509_key_name: Name of X509 key (None for node daemon key)
444
    @type source_x509_ca: string
445
    @param source_x509_ca: Remote peer's CA (None for node daemon certificate)
446
    @type instance: L{objects.Instance}
447
    @param instance: Instance object
448
    @param dest: I/O destination
449
    @param dest_args: I/O arguments
450
    @type timeouts: L{ImportExportTimeouts}
451
    @param timeouts: Timeouts for this import
452
    @type cbs: L{ImportExportCbBase}
453
    @param cbs: Callbacks
454
    @param private: Private data for callback functions
455

456
    """
457
    _DiskImportExportBase.__init__(self, lu, node_name,
458
                                   x509_key_name, source_x509_ca,
459
                                   instance, timeouts, cbs, private)
460
    self._dest = dest
461
    self._dest_args = dest_args
462

    
463
    # Timestamps
464
    self._ts_listening = None
465

    
466
  @property
467
  def listen_port(self):
468
    """Returns the port the daemon is listening on.
469

470
    """
471
    if self._daemon:
472
      return self._daemon.listen_port
473

    
474
    return None
475

    
476
  def _StartDaemon(self):
477
    """Starts the import daemon.
478

479
    """
480
    return self._lu.rpc.call_import_start(self.node_name,
481
                                          self._x509_key_name,
482
                                          self._remote_x509_ca, self._instance,
483
                                          self._dest, self._dest_args)
484

    
485
  def CheckListening(self):
486
    """Checks whether the daemon is listening.
487

488
    @rtype: bool
489
    @return: Whether the daemon is listening
490

491
    """
492
    assert self._daemon, "Daemon status missing"
493

    
494
    if self._ts_listening is not None:
495
      return True
496

    
497
    port = self._daemon.listen_port
498
    if port is not None:
499
      self._ts_listening = time.time()
500

    
501
      logging.debug("Import %r on %s is now listening on port %s",
502
                    self._daemon_name, self.node_name, port)
503

    
504
      self._cbs.ReportListening(self, self._private)
505

    
506
      return True
507

    
508
    if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
509
      raise _ImportExportError("Not listening after %s seconds" %
510
                               self._timeouts.listen)
511

    
512
    return False
513

    
514
  def _GetConnectedCheckEpoch(self):
515
    """Returns the time since we started listening.
516

517
    """
518
    assert self._ts_listening is not None, \
519
           ("Checking whether an import is connected is only useful"
520
            " once it's been listening")
521

    
522
    return self._ts_listening
523

    
524

    
525
class DiskExport(_DiskImportExportBase):
526
  MODE_TEXT = "export"
527

    
528
  def __init__(self, lu, node_name, x509_key_name, dest_x509_ca,
529
               dest_host, dest_port, instance, source, source_args,
530
               timeouts, cbs, private=None):
531
    """Initializes this class.
532

533
    @param lu: Logical unit instance
534
    @type node_name: string
535
    @param node_name: Node name for import
536
    @type x509_key_name: string
537
    @param x509_key_name: Name of X509 key (None for node daemon key)
538
    @type dest_x509_ca: string
539
    @param dest_x509_ca: Remote peer's CA (None for node daemon certificate)
540
    @type dest_host: string
541
    @param dest_host: Destination host name or IP address
542
    @type dest_port: number
543
    @param dest_port: Destination port number
544
    @type instance: L{objects.Instance}
545
    @param instance: Instance object
546
    @param source: I/O source
547
    @param source_args: I/O source
548
    @type timeouts: L{ImportExportTimeouts}
549
    @param timeouts: Timeouts for this import
550
    @type cbs: L{ImportExportCbBase}
551
    @param cbs: Callbacks
552
    @param private: Private data for callback functions
553

554
    """
555
    _DiskImportExportBase.__init__(self, lu, node_name,
556
                                   x509_key_name, dest_x509_ca,
557
                                   instance, timeouts, cbs, private)
558
    self._dest_host = dest_host
559
    self._dest_port = dest_port
560
    self._source = source
561
    self._source_args = source_args
562

    
563
  def _StartDaemon(self):
564
    """Starts the export daemon.
565

566
    """
567
    return self._lu.rpc.call_export_start(self.node_name, self._x509_key_name,
568
                                          self._remote_x509_ca,
569
                                          self._dest_host, self._dest_port,
570
                                          self._instance, self._source,
571
                                          self._source_args)
572

    
573
  def CheckListening(self):
574
    """Checks whether the daemon is listening.
575

576
    """
577
    # Only an import can be listening
578
    return True
579

    
580
  def _GetConnectedCheckEpoch(self):
581
    """Returns the time since the daemon started.
582

583
    """
584
    assert self._ts_begin is not None
585

    
586
    return self._ts_begin
587

    
588

    
589
class ImportExportLoop:
590
  MIN_DELAY = 1.0
591
  MAX_DELAY = 20.0
592

    
593
  def __init__(self, lu):
594
    """Initializes this class.
595

596
    """
597
    self._lu = lu
598
    self._queue = []
599
    self._pending_add = []
600

    
601
  def Add(self, diskie):
602
    """Adds an import/export object to the loop.
603

604
    @type diskie: Subclass of L{_DiskImportExportBase}
605
    @param diskie: Import/export object
606

607
    """
608
    assert diskie not in self._pending_add
609
    assert diskie.loop is None
610

    
611
    diskie.SetLoop(self)
612

    
613
    # Adding new objects to a staging list is necessary, otherwise the main
614
    # loop gets confused if callbacks modify the queue while the main loop is
615
    # iterating over it.
616
    self._pending_add.append(diskie)
617

    
618
  @staticmethod
619
  def _CollectDaemonStatus(lu, daemons):
620
    """Collects the status for all import/export daemons.
621

622
    """
623
    daemon_status = {}
624

    
625
    for node_name, names in daemons.iteritems():
626
      result = lu.rpc.call_impexp_status(node_name, names)
627
      if result.fail_msg:
628
        lu.LogWarning("Failed to get daemon status on %s: %s",
629
                      node_name, result.fail_msg)
630
        continue
631

    
632
      assert len(names) == len(result.payload)
633

    
634
      daemon_status[node_name] = dict(zip(names, result.payload))
635

    
636
    return daemon_status
637

    
638
  @staticmethod
639
  def _GetActiveDaemonNames(queue):
640
    """Gets the names of all active daemons.
641

642
    """
643
    result = {}
644
    for diskie in queue:
645
      if not diskie.active:
646
        continue
647

    
648
      try:
649
        # Start daemon if necessary
650
        daemon_name = diskie.CheckDaemon()
651
      except _ImportExportError, err:
652
        logging.exception("%s failed", diskie.MODE_TEXT)
653
        diskie.Finalize(error=str(err))
654
        continue
655

    
656
      result.setdefault(diskie.node_name, []).append(daemon_name)
657

    
658
    assert len(queue) >= len(result)
659
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
660

    
661
    logging.debug("daemons=%r", result)
662

    
663
    return result
664

    
665
  def _AddPendingToQueue(self):
666
    """Adds all pending import/export objects to the internal queue.
667

668
    """
669
    assert compat.all(diskie not in self._queue and diskie.loop == self
670
                      for diskie in self._pending_add)
671

    
672
    self._queue.extend(self._pending_add)
673

    
674
    del self._pending_add[:]
675

    
676
  def Run(self):
677
    """Utility main loop.
678

679
    """
680
    while True:
681
      self._AddPendingToQueue()
682

    
683
      # Collect all active daemon names
684
      daemons = self._GetActiveDaemonNames(self._queue)
685
      if not daemons:
686
        break
687

    
688
      # Collection daemon status data
689
      data = self._CollectDaemonStatus(self._lu, daemons)
690

    
691
      # Use data
692
      delay = self.MAX_DELAY
693
      for diskie in self._queue:
694
        if not diskie.active:
695
          continue
696

    
697
        try:
698
          try:
699
            all_daemon_data = data[diskie.node_name]
700
          except KeyError:
701
            result = diskie.SetDaemonData(False, None)
702
          else:
703
            result = \
704
              diskie.SetDaemonData(True,
705
                                   all_daemon_data[diskie.GetDaemonName()])
706

    
707
          if not result:
708
            # Daemon not yet ready, retry soon
709
            delay = min(3.0, delay)
710
            continue
711

    
712
          if diskie.CheckFinished():
713
            # Transfer finished
714
            diskie.Finalize()
715
            continue
716

    
717
          # Normal case: check again in 5 seconds
718
          delay = min(5.0, delay)
719

    
720
          if not diskie.CheckListening():
721
            # Not yet listening, retry soon
722
            delay = min(1.0, delay)
723
            continue
724

    
725
          if not diskie.CheckConnected():
726
            # Not yet connected, retry soon
727
            delay = min(1.0, delay)
728
            continue
729

    
730
        except _ImportExportError, err:
731
          logging.exception("%s failed", diskie.MODE_TEXT)
732
          diskie.Finalize(error=str(err))
733

    
734
      if not compat.any([diskie.active for diskie in self._queue]):
735
        break
736

    
737
      # Wait a bit
738
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
739
      logging.debug("Waiting for %ss", delay)
740
      time.sleep(delay)
741

    
742
  def FinalizeAll(self):
743
    """Finalizes all pending transfers.
744

745
    """
746
    success = True
747

    
748
    for diskie in self._queue:
749
      success = diskie.Finalize() and success
750

    
751
    return success
752

    
753

    
754
class _TransferInstCbBase(ImportExportCbBase):
755
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
756
               dest_node, dest_ip):
757
    """Initializes this class.
758

759
    """
760
    ImportExportCbBase.__init__(self)
761

    
762
    self.lu = lu
763
    self.feedback_fn = feedback_fn
764
    self.instance = instance
765
    self.timeouts = timeouts
766
    self.src_node = src_node
767
    self.src_cbs = src_cbs
768
    self.dest_node = dest_node
769
    self.dest_ip = dest_ip
770

    
771

    
772
class _TransferInstSourceCb(_TransferInstCbBase):
773
  def ReportConnected(self, ie, dtp):
774
    """Called when a connection has been established.
775

776
    """
777
    assert self.src_cbs is None
778
    assert dtp.src_export == ie
779
    assert dtp.dest_import
780

    
781
    self.feedback_fn("%s is sending data on %s" %
782
                     (dtp.data.name, ie.node_name))
783

    
784
  def ReportFinished(self, ie, dtp):
785
    """Called when a transfer has finished.
786

787
    """
788
    assert self.src_cbs is None
789
    assert dtp.src_export == ie
790
    assert dtp.dest_import
791

    
792
    if ie.success:
793
      self.feedback_fn("%s finished sending data" % dtp.data.name)
794
    else:
795
      self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
796
                       (dtp.data.name, ie.final_message, ie.recent_output))
797

    
798
    dtp.RecordResult(ie.success)
799

    
800
    cb = dtp.data.finished_fn
801
    if cb:
802
      cb()
803

    
804
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
805
    # give the daemon a moment to sort things out
806
    if dtp.dest_import and not ie.success:
807
      dtp.dest_import.Abort()
808

    
809

    
810
class _TransferInstDestCb(_TransferInstCbBase):
811
  def ReportListening(self, ie, dtp):
812
    """Called when daemon started listening.
813

814
    """
815
    assert self.src_cbs
816
    assert dtp.src_export is None
817
    assert dtp.dest_import
818

    
819
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
820

    
821
    # Start export on source node
822
    de = DiskExport(self.lu, self.src_node, None, None, self.dest_ip,
823
                    ie.listen_port, self.instance,
824
                    dtp.data.src_io, dtp.data.src_ioargs,
825
                    self.timeouts, self.src_cbs, private=dtp)
826
    ie.loop.Add(de)
827

    
828
    dtp.src_export = de
829

    
830
  def ReportConnected(self, ie, dtp):
831
    """Called when a connection has been established.
832

833
    """
834
    self.feedback_fn("%s is receiving data on %s" %
835
                     (dtp.data.name, self.dest_node))
836

    
837
  def ReportFinished(self, ie, dtp):
838
    """Called when a transfer has finished.
839

840
    """
841
    if ie.success:
842
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
843
    else:
844
      self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
845
                       (dtp.data.name, ie.final_message, ie.recent_output))
846

    
847
    dtp.RecordResult(ie.success)
848

    
849
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
850
    # give the daemon a moment to sort things out
851
    if dtp.src_export and not ie.success:
852
      dtp.src_export.Abort()
853

    
854

    
855
class DiskTransfer(object):
856
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
857
               finished_fn):
858
    """Initializes this class.
859

860
    @type name: string
861
    @param name: User-visible name for this transfer (e.g. "disk/0")
862
    @param src_io: Source I/O type
863
    @param src_ioargs: Source I/O arguments
864
    @param dest_io: Destination I/O type
865
    @param dest_ioargs: Destination I/O arguments
866
    @type finished_fn: callable
867
    @param finished_fn: Function called once transfer has finished
868

869
    """
870
    self.name = name
871

    
872
    self.src_io = src_io
873
    self.src_ioargs = src_ioargs
874

    
875
    self.dest_io = dest_io
876
    self.dest_ioargs = dest_ioargs
877

    
878
    self.finished_fn = finished_fn
879

    
880

    
881
class _DiskTransferPrivate(object):
882
  def __init__(self, data, success):
883
    """Initializes this class.
884

885
    @type data: L{DiskTransfer}
886
    @type success: bool
887

888
    """
889
    self.data = data
890

    
891
    self.src_export = None
892
    self.dest_import = None
893

    
894
    self.success = success
895

    
896
  def RecordResult(self, success):
897
    """Updates the status.
898

899
    One failed part will cause the whole transfer to fail.
900

901
    """
902
    self.success = self.success and success
903

    
904

    
905
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
906
                         instance, all_transfers):
907
  """Transfers an instance's data from one node to another.
908

909
  @param lu: Logical unit instance
910
  @param feedback_fn: Feedback function
911
  @type src_node: string
912
  @param src_node: Source node name
913
  @type dest_node: string
914
  @param dest_node: Destination node name
915
  @type dest_ip: string
916
  @param dest_ip: IP address of destination node
917
  @type instance: L{objects.Instance}
918
  @param instance: Instance object
919
  @type all_transfers: list of L{DiskTransfer} instances
920
  @param all_transfers: List of all disk transfers to be made
921
  @rtype: list
922
  @return: List with a boolean (True=successful, False=failed) for success for
923
           each transfer
924

925
  """
926
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
927
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
928
                                  src_node, None, dest_node, dest_ip)
929
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
930
                                 src_node, src_cbs, dest_node, dest_ip)
931

    
932
  all_dtp = []
933

    
934
  ieloop = ImportExportLoop(lu)
935
  try:
936
    for transfer in all_transfers:
937
      if transfer:
938
        feedback_fn("Exporting %s from %s to %s" %
939
                    (transfer.name, src_node, dest_node))
940

    
941
        dtp = _DiskTransferPrivate(transfer, True)
942

    
943
        di = DiskImport(lu, dest_node, None, None, instance,
944
                        transfer.dest_io, transfer.dest_ioargs,
945
                        timeouts, dest_cbs, private=dtp)
946
        ieloop.Add(di)
947

    
948
        dtp.dest_import = di
949
      else:
950
        dtp = _DiskTransferPrivate(None, False)
951

    
952
      all_dtp.append(dtp)
953

    
954
    ieloop.Run()
955
  finally:
956
    ieloop.FinalizeAll()
957

    
958
  assert len(all_dtp) == len(all_transfers)
959
  assert compat.all([(dtp.src_export is None or
960
                      dtp.src_export.success is not None) and
961
                     (dtp.dest_import is None or
962
                      dtp.dest_import.success is not None)
963
                     for dtp in all_dtp]), \
964
         "Not all imports/exports are finalized"
965

    
966
  return [bool(dtp.success) for dtp in all_dtp]
967

    
968

    
969
class ExportInstanceHelper:
970
  def __init__(self, lu, feedback_fn, instance):
971
    """Initializes this class.
972

973
    @param lu: Logical unit instance
974
    @param feedback_fn: Feedback function
975
    @type instance: L{objects.Instance}
976
    @param instance: Instance object
977

978
    """
979
    self._lu = lu
980
    self._feedback_fn = feedback_fn
981
    self._instance = instance
982

    
983
    self._snap_disks = []
984
    self._removed_snaps = [False] * len(instance.disks)
985

    
986
  def CreateSnapshots(self):
987
    """Creates an LVM snapshot for every disk of the instance.
988

989
    """
990
    assert not self._snap_disks
991

    
992
    instance = self._instance
993
    src_node = instance.primary_node
994

    
995
    vgname = self._lu.cfg.GetVGName()
996

    
997
    for idx, disk in enumerate(instance.disks):
998
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
999
                        (idx, src_node))
1000

    
1001
      # result.payload will be a snapshot of an lvm leaf of the one we
1002
      # passed
1003
      result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1004
      msg = result.fail_msg
1005
      if msg:
1006
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1007
                            idx, src_node, msg)
1008
        new_dev = False
1009
      else:
1010
        disk_id = (vgname, result.payload)
1011
        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1012
                               logical_id=disk_id, physical_id=disk_id,
1013
                               iv_name=disk.iv_name)
1014

    
1015
      self._snap_disks.append(new_dev)
1016

    
1017
    assert len(self._snap_disks) == len(instance.disks)
1018
    assert len(self._removed_snaps) == len(instance.disks)
1019

    
1020
  def _RemoveSnapshot(self, disk_index):
1021
    """Removes an LVM snapshot.
1022

1023
    @type disk_index: number
1024
    @param disk_index: Index of the snapshot to be removed
1025

1026
    """
1027
    disk = self._snap_disks[disk_index]
1028
    if disk and not self._removed_snaps[disk_index]:
1029
      src_node = self._instance.primary_node
1030

    
1031
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1032
                        (disk_index, src_node))
1033

    
1034
      result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1035
      if result.fail_msg:
1036
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1037
                            " %s: %s", disk_index, src_node, result.fail_msg)
1038
      else:
1039
        self._removed_snaps[disk_index] = True
1040

    
1041
  def LocalExport(self, dest_node):
1042
    """Intra-cluster instance export.
1043

1044
    @type dest_node: L{objects.Node}
1045
    @param dest_node: Destination node
1046

1047
    """
1048
    instance = self._instance
1049
    src_node = instance.primary_node
1050

    
1051
    assert len(self._snap_disks) == len(instance.disks)
1052

    
1053
    transfers = []
1054

    
1055
    for idx, dev in enumerate(self._snap_disks):
1056
      if not dev:
1057
        transfers.append(None)
1058
        continue
1059

    
1060
      path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1061
                            dev.physical_id[1])
1062

    
1063
      finished_fn = compat.partial(self._TransferFinished, idx)
1064

    
1065
      # FIXME: pass debug option from opcode to backend
1066
      dt = DiskTransfer("snapshot/%s" % idx,
1067
                        constants.IEIO_SCRIPT, (dev, idx),
1068
                        constants.IEIO_FILE, (path, ),
1069
                        finished_fn)
1070
      transfers.append(dt)
1071

    
1072
    # Actually export data
1073
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1074
                                    src_node, dest_node.name,
1075
                                    dest_node.secondary_ip,
1076
                                    instance, transfers)
1077

    
1078
    assert len(dresults) == len(instance.disks)
1079

    
1080
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1081
    result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1082
                                               self._snap_disks)
1083
    msg = result.fail_msg
1084
    fin_resu = not msg
1085
    if msg:
1086
      self._lu.LogWarning("Could not finalize export for instance %s"
1087
                          " on node %s: %s", instance.name, dest_node.name, msg)
1088

    
1089
    return (fin_resu, dresults)
1090

    
1091
  def _TransferFinished(self, idx):
1092
    """Called once a transfer has finished.
1093

1094
    @type idx: number
1095
    @param idx: Disk index
1096

1097
    """
1098
    logging.debug("Transfer %s finished", idx)
1099
    self._RemoveSnapshot(idx)
1100

    
1101
  def Cleanup(self):
1102
    """Remove all snapshots.
1103

1104
    """
1105
    assert len(self._removed_snaps) == len(self._instance.disks)
1106
    for idx in range(len(self._instance.disks)):
1107
      self._RemoveSnapshot(idx)
1108

    
1109

    
1110
def _GetImportExportHandshakeMessage(version):
1111
  """Returns the handshake message for a RIE protocol version.
1112

1113
  @type version: number
1114

1115
  """
1116
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1117

    
1118

    
1119
def ComputeRemoteExportHandshake(cds):
1120
  """Computes the remote import/export handshake.
1121

1122
  @type cds: string
1123
  @param cds: Cluster domain secret
1124

1125
  """
1126
  salt = utils.GenerateSecret(8)
1127
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1128
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1129

    
1130

    
1131
def CheckRemoteExportHandshake(cds, handshake):
1132
  """Checks the handshake of a remote import/export.
1133

1134
  @type cds: string
1135
  @param cds: Cluster domain secret
1136
  @type handshake: sequence
1137
  @param handshake: Handshake sent by remote peer
1138

1139
  """
1140
  try:
1141
    (version, hmac_digest, hmac_salt) = handshake
1142
  except (TypeError, ValueError), err:
1143
    return "Invalid data: %s" % err
1144

    
1145
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1146
                              hmac_digest, salt=hmac_salt):
1147
    return "Hash didn't match, clusters don't share the same domain secret"
1148

    
1149
  if version != constants.RIE_VERSION:
1150
    return ("Clusters don't have the same remote import/export protocol"
1151
            " (local=%s, remote=%s)" %
1152
            (constants.RIE_VERSION, version))
1153

    
1154
  return None