Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ 4a96f1d1

History | View | Annotate | Download (35.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Instance-related functions and classes for masterd.
23

24
"""
25

    
26
import logging
27
import time
28
import OpenSSL
29

    
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import compat
33
from ganeti import utils
34
from ganeti import objects
35

    
36

    
37
class _ImportExportError(Exception):
38
  """Local exception to report import/export errors.
39

40
  """
41

    
42

    
43
class ImportExportTimeouts(object):
44
  #: Time until daemon starts writing status file
45
  DEFAULT_READY_TIMEOUT = 10
46

    
47
  #: Length of time until errors cause hard failure
48
  DEFAULT_ERROR_TIMEOUT = 10
49

    
50
  #: Time after which daemon must be listening
51
  DEFAULT_LISTEN_TIMEOUT = 10
52

    
53
  __slots__ = [
54
    "error",
55
    "ready",
56
    "listen",
57
    "connect",
58
    ]
59

    
60
  def __init__(self, connect,
61
               listen=DEFAULT_LISTEN_TIMEOUT,
62
               error=DEFAULT_ERROR_TIMEOUT,
63
               ready=DEFAULT_READY_TIMEOUT):
64
    """Initializes this class.
65

66
    @type connect: number
67
    @param connect: Timeout for establishing connection
68
    @type listen: number
69
    @param listen: Timeout for starting to listen for connections
70
    @type error: number
71
    @param error: Length of time until errors cause hard failure
72
    @type ready: number
73
    @param ready: Timeout for daemon to become ready
74

75
    """
76
    self.error = error
77
    self.ready = ready
78
    self.listen = listen
79
    self.connect = connect
80

    
81

    
82
class ImportExportCbBase(object):
83
  """Callbacks for disk import/export.
84

85
  """
86
  def ReportListening(self, ie, private):
87
    """Called when daemon started listening.
88

89
    @type ie: Subclass of L{_DiskImportExportBase}
90
    @param ie: Import/export object
91
    @param private: Private data passed to import/export object
92

93
    """
94

    
95
  def ReportConnected(self, ie, private):
96
    """Called when a connection has been established.
97

98
    @type ie: Subclass of L{_DiskImportExportBase}
99
    @param ie: Import/export object
100
    @param private: Private data passed to import/export object
101

102
    """
103

    
104
  def ReportFinished(self, ie, private):
105
    """Called when a transfer has finished.
106

107
    @type ie: Subclass of L{_DiskImportExportBase}
108
    @param ie: Import/export object
109
    @param private: Private data passed to import/export object
110

111
    """
112

    
113

    
114
def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
115
  """Checks whether a timeout has expired.
116

117
  """
118
  return _time_fn() > (epoch + timeout)
119

    
120

    
121
class _DiskImportExportBase(object):
122
  MODE_TEXT = None
123

    
124
  def __init__(self, lu, node_name, x509_key_name, remote_x509_ca,
125
               instance, timeouts, cbs, private=None):
126
    """Initializes this class.
127

128
    @param lu: Logical unit instance
129
    @type node_name: string
130
    @param node_name: Node name for import
131
    @type x509_key_name: string
132
    @param x509_key_name: Name of X509 key (None for node daemon key)
133
    @type remote_x509_ca: string
134
    @param remote_x509_ca: Remote peer's CA (None for node daemon certificate)
135
    @type instance: L{objects.Instance}
136
    @param instance: Instance object
137
    @type timeouts: L{ImportExportTimeouts}
138
    @param timeouts: Timeouts for this import
139
    @type cbs: L{ImportExportCbBase}
140
    @param cbs: Callbacks
141
    @param private: Private data for callback functions
142

143
    """
144
    assert self.MODE_TEXT
145

    
146
    self._lu = lu
147
    self.node_name = node_name
148
    self._x509_key_name = x509_key_name
149
    self._remote_x509_ca = remote_x509_ca
150
    self._instance = instance
151
    self._timeouts = timeouts
152
    self._cbs = cbs
153
    self._private = private
154

    
155
    # Parent loop
156
    self._loop = None
157

    
158
    # Timestamps
159
    self._ts_begin = None
160
    self._ts_connected = None
161
    self._ts_finished = None
162
    self._ts_cleanup = None
163
    self._ts_last_error = None
164

    
165
    # Transfer status
166
    self.success = None
167
    self.final_message = None
168

    
169
    # Daemon status
170
    self._daemon_name = None
171
    self._daemon = None
172

    
173
  @property
174
  def recent_output(self):
175
    """Returns the most recent output from the daemon.
176

177
    """
178
    if self._daemon:
179
      return self._daemon.recent_output
180

    
181
    return None
182

    
183
  @property
184
  def active(self):
185
    """Determines whether this transport is still active.
186

187
    """
188
    return self.success is None
189

    
190
  @property
191
  def loop(self):
192
    """Returns parent loop.
193

194
    @rtype: L{ImportExportLoop}
195

196
    """
197
    return self._loop
198

    
199
  def SetLoop(self, loop):
200
    """Sets the parent loop.
201

202
    @type loop: L{ImportExportLoop}
203

204
    """
205
    if self._loop:
206
      raise errors.ProgrammerError("Loop can only be set once")
207

    
208
    self._loop = loop
209

    
210
  def _StartDaemon(self):
211
    """Starts the import/export daemon.
212

213
    """
214
    raise NotImplementedError()
215

    
216
  def CheckDaemon(self):
217
    """Checks whether daemon has been started and if not, starts it.
218

219
    @rtype: string
220
    @return: Daemon name
221

222
    """
223
    assert self._ts_cleanup is None
224

    
225
    if self._daemon_name is None:
226
      assert self._ts_begin is None
227

    
228
      result = self._StartDaemon()
229
      if result.fail_msg:
230
        raise _ImportExportError("Failed to start %s on %s: %s" %
231
                                 (self.MODE_TEXT, self.node_name,
232
                                  result.fail_msg))
233

    
234
      daemon_name = result.payload
235

    
236
      logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name,
237
                   self.node_name)
238

    
239
      self._ts_begin = time.time()
240
      self._daemon_name = daemon_name
241

    
242
    return self._daemon_name
243

    
244
  def GetDaemonName(self):
245
    """Returns the daemon name.
246

247
    """
248
    assert self._daemon_name, "Daemon has not been started"
249
    assert self._ts_cleanup is None
250
    return self._daemon_name
251

    
252
  def Abort(self):
253
    """Sends SIGTERM to import/export daemon (if still active).
254

255
    """
256
    if self._daemon_name:
257
      self._lu.LogWarning("Aborting %s %r on %s",
258
                          self.MODE_TEXT, self._daemon_name, self.node_name)
259
      result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
260
      if result.fail_msg:
261
        self._lu.LogWarning("Failed to abort %s %r on %s: %s",
262
                            self.MODE_TEXT, self._daemon_name,
263
                            self.node_name, result.fail_msg)
264
        return False
265

    
266
    return True
267

    
268
  def _SetDaemonData(self, data):
269
    """Internal function for updating status daemon data.
270

271
    @type data: L{objects.ImportExportStatus}
272
    @param data: Daemon status data
273

274
    """
275
    assert self._ts_begin is not None
276

    
277
    if not data:
278
      if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
279
        raise _ImportExportError("Didn't become ready after %s seconds" %
280
                                 self._timeouts.ready)
281

    
282
      return False
283

    
284
    self._daemon = data
285

    
286
    return True
287

    
288
  def SetDaemonData(self, success, data):
289
    """Updates daemon status data.
290

291
    @type success: bool
292
    @param success: Whether fetching data was successful or not
293
    @type data: L{objects.ImportExportStatus}
294
    @param data: Daemon status data
295

296
    """
297
    if not success:
298
      if self._ts_last_error is None:
299
        self._ts_last_error = time.time()
300

    
301
      elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
302
        raise _ImportExportError("Too many errors while updating data")
303

    
304
      return False
305

    
306
    self._ts_last_error = None
307

    
308
    return self._SetDaemonData(data)
309

    
310
  def CheckListening(self):
311
    """Checks whether the daemon is listening.
312

313
    """
314
    raise NotImplementedError()
315

    
316
  def _GetConnectedCheckEpoch(self):
317
    """Returns timeout to calculate connect timeout.
318

319
    """
320
    raise NotImplementedError()
321

    
322
  def CheckConnected(self):
323
    """Checks whether the daemon is connected.
324

325
    @rtype: bool
326
    @return: Whether the daemon is connected
327

328
    """
329
    assert self._daemon, "Daemon status missing"
330

    
331
    if self._ts_connected is not None:
332
      return True
333

    
334
    if self._daemon.connected:
335
      self._ts_connected = time.time()
336

    
337
      # TODO: Log remote peer
338
      logging.debug("%s %r on %s is now connected",
339
                    self.MODE_TEXT, self._daemon_name, self.node_name)
340

    
341
      self._cbs.ReportConnected(self, self._private)
342

    
343
      return True
344

    
345
    if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
346
      raise _ImportExportError("Not connected after %s seconds" %
347
                               self._timeouts.connect)
348

    
349
    return False
350

    
351
  def CheckFinished(self):
352
    """Checks whether the daemon exited.
353

354
    @rtype: bool
355
    @return: Whether the transfer is finished
356

357
    """
358
    assert self._daemon, "Daemon status missing"
359

    
360
    if self._ts_finished:
361
      return True
362

    
363
    if self._daemon.exit_status is None:
364
      return False
365

    
366
    self._ts_finished = time.time()
367

    
368
    self._ReportFinished(self._daemon.exit_status == 0,
369
                         self._daemon.error_message)
370

    
371
    return True
372

    
373
  def _ReportFinished(self, success, message):
374
    """Transfer is finished or daemon exited.
375

376
    @type success: bool
377
    @param success: Whether the transfer was successful
378
    @type message: string
379
    @param message: Error message
380

381
    """
382
    assert self.success is None
383

    
384
    self.success = success
385
    self.final_message = message
386

    
387
    if success:
388
      logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name,
389
                   self.node_name)
390
    elif self._daemon_name:
391
      self._lu.LogWarning("%s %r on %s failed: %s",
392
                          self.MODE_TEXT, self._daemon_name, self.node_name,
393
                          message)
394
    else:
395
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
396
                          self.node_name, message)
397

    
398
    self._cbs.ReportFinished(self, self._private)
399

    
400
  def _Finalize(self):
401
    """Makes the RPC call to finalize this import/export.
402

403
    """
404
    return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
405

    
406
  def Finalize(self, error=None):
407
    """Finalizes this import/export.
408

409
    """
410
    assert error or self.success is not None
411

    
412
    if self._daemon_name:
413
      logging.info("Finalizing %s %r on %s",
414
                   self.MODE_TEXT, self._daemon_name, self.node_name)
415

    
416
      result = self._Finalize()
417
      if result.fail_msg:
418
        self._lu.LogWarning("Failed to finalize %s %r on %s: %s",
419
                            self.MODE_TEXT, self._daemon_name,
420
                            self.node_name, result.fail_msg)
421
        return False
422

    
423
      # Daemon is no longer running
424
      self._daemon_name = None
425
      self._ts_cleanup = time.time()
426

    
427
    if error:
428
      self._ReportFinished(False, error)
429

    
430
    return True
431

    
432

    
433
class DiskImport(_DiskImportExportBase):
434
  MODE_TEXT = "import"
435

    
436
  def __init__(self, lu, node_name, x509_key_name, source_x509_ca, instance,
437
               dest, dest_args, timeouts, cbs, private=None):
438
    """Initializes this class.
439

440
    @param lu: Logical unit instance
441
    @type node_name: string
442
    @param node_name: Node name for import
443
    @type x509_key_name: string
444
    @param x509_key_name: Name of X509 key (None for node daemon key)
445
    @type source_x509_ca: string
446
    @param source_x509_ca: Remote peer's CA (None for node daemon certificate)
447
    @type instance: L{objects.Instance}
448
    @param instance: Instance object
449
    @param dest: I/O destination
450
    @param dest_args: I/O arguments
451
    @type timeouts: L{ImportExportTimeouts}
452
    @param timeouts: Timeouts for this import
453
    @type cbs: L{ImportExportCbBase}
454
    @param cbs: Callbacks
455
    @param private: Private data for callback functions
456

457
    """
458
    _DiskImportExportBase.__init__(self, lu, node_name,
459
                                   x509_key_name, source_x509_ca,
460
                                   instance, timeouts, cbs, private)
461
    self._dest = dest
462
    self._dest_args = dest_args
463

    
464
    # Timestamps
465
    self._ts_listening = None
466

    
467
  @property
468
  def listen_port(self):
469
    """Returns the port the daemon is listening on.
470

471
    """
472
    if self._daemon:
473
      return self._daemon.listen_port
474

    
475
    return None
476

    
477
  def _StartDaemon(self):
478
    """Starts the import daemon.
479

480
    """
481
    return self._lu.rpc.call_import_start(self.node_name,
482
                                          self._x509_key_name,
483
                                          self._remote_x509_ca, self._instance,
484
                                          self._dest, self._dest_args)
485

    
486
  def CheckListening(self):
487
    """Checks whether the daemon is listening.
488

489
    @rtype: bool
490
    @return: Whether the daemon is listening
491

492
    """
493
    assert self._daemon, "Daemon status missing"
494

    
495
    if self._ts_listening is not None:
496
      return True
497

    
498
    port = self._daemon.listen_port
499
    if port is not None:
500
      self._ts_listening = time.time()
501

    
502
      logging.debug("Import %r on %s is now listening on port %s",
503
                    self._daemon_name, self.node_name, port)
504

    
505
      self._cbs.ReportListening(self, self._private)
506

    
507
      return True
508

    
509
    if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
510
      raise _ImportExportError("Not listening after %s seconds" %
511
                               self._timeouts.listen)
512

    
513
    return False
514

    
515
  def _GetConnectedCheckEpoch(self):
516
    """Returns the time since we started listening.
517

518
    """
519
    assert self._ts_listening is not None, \
520
           ("Checking whether an import is connected is only useful"
521
            " once it's been listening")
522

    
523
    return self._ts_listening
524

    
525

    
526
class DiskExport(_DiskImportExportBase):
527
  MODE_TEXT = "export"
528

    
529
  def __init__(self, lu, node_name, x509_key_name, dest_x509_ca,
530
               dest_host, dest_port, instance, source, source_args,
531
               timeouts, cbs, private=None):
532
    """Initializes this class.
533

534
    @param lu: Logical unit instance
535
    @type node_name: string
536
    @param node_name: Node name for import
537
    @type x509_key_name: string
538
    @param x509_key_name: Name of X509 key (None for node daemon key)
539
    @type dest_x509_ca: string
540
    @param dest_x509_ca: Remote peer's CA (None for node daemon certificate)
541
    @type dest_host: string
542
    @param dest_host: Destination host name or IP address
543
    @type dest_port: number
544
    @param dest_port: Destination port number
545
    @type instance: L{objects.Instance}
546
    @param instance: Instance object
547
    @param source: I/O source
548
    @param source_args: I/O source
549
    @type timeouts: L{ImportExportTimeouts}
550
    @param timeouts: Timeouts for this import
551
    @type cbs: L{ImportExportCbBase}
552
    @param cbs: Callbacks
553
    @param private: Private data for callback functions
554

555
    """
556
    _DiskImportExportBase.__init__(self, lu, node_name,
557
                                   x509_key_name, dest_x509_ca,
558
                                   instance, timeouts, cbs, private)
559
    self._dest_host = dest_host
560
    self._dest_port = dest_port
561
    self._source = source
562
    self._source_args = source_args
563

    
564
  def _StartDaemon(self):
565
    """Starts the export daemon.
566

567
    """
568
    return self._lu.rpc.call_export_start(self.node_name, self._x509_key_name,
569
                                          self._remote_x509_ca,
570
                                          self._dest_host, self._dest_port,
571
                                          self._instance, self._source,
572
                                          self._source_args)
573

    
574
  def CheckListening(self):
575
    """Checks whether the daemon is listening.
576

577
    """
578
    # Only an import can be listening
579
    return True
580

    
581
  def _GetConnectedCheckEpoch(self):
582
    """Returns the time since the daemon started.
583

584
    """
585
    assert self._ts_begin is not None
586

    
587
    return self._ts_begin
588

    
589

    
590
class ImportExportLoop:
591
  MIN_DELAY = 1.0
592
  MAX_DELAY = 20.0
593

    
594
  def __init__(self, lu):
595
    """Initializes this class.
596

597
    """
598
    self._lu = lu
599
    self._queue = []
600
    self._pending_add = []
601

    
602
  def Add(self, diskie):
603
    """Adds an import/export object to the loop.
604

605
    @type diskie: Subclass of L{_DiskImportExportBase}
606
    @param diskie: Import/export object
607

608
    """
609
    assert diskie not in self._pending_add
610
    assert diskie.loop is None
611

    
612
    diskie.SetLoop(self)
613

    
614
    # Adding new objects to a staging list is necessary, otherwise the main
615
    # loop gets confused if callbacks modify the queue while the main loop is
616
    # iterating over it.
617
    self._pending_add.append(diskie)
618

    
619
  @staticmethod
620
  def _CollectDaemonStatus(lu, daemons):
621
    """Collects the status for all import/export daemons.
622

623
    """
624
    daemon_status = {}
625

    
626
    for node_name, names in daemons.iteritems():
627
      result = lu.rpc.call_impexp_status(node_name, names)
628
      if result.fail_msg:
629
        lu.LogWarning("Failed to get daemon status on %s: %s",
630
                      node_name, result.fail_msg)
631
        continue
632

    
633
      assert len(names) == len(result.payload)
634

    
635
      daemon_status[node_name] = dict(zip(names, result.payload))
636

    
637
    return daemon_status
638

    
639
  @staticmethod
640
  def _GetActiveDaemonNames(queue):
641
    """Gets the names of all active daemons.
642

643
    """
644
    result = {}
645
    for diskie in queue:
646
      if not diskie.active:
647
        continue
648

    
649
      try:
650
        # Start daemon if necessary
651
        daemon_name = diskie.CheckDaemon()
652
      except _ImportExportError, err:
653
        logging.exception("%s failed", diskie.MODE_TEXT)
654
        diskie.Finalize(error=str(err))
655
        continue
656

    
657
      result.setdefault(diskie.node_name, []).append(daemon_name)
658

    
659
    assert len(queue) >= len(result)
660
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
661

    
662
    logging.debug("daemons=%r", result)
663

    
664
    return result
665

    
666
  def _AddPendingToQueue(self):
667
    """Adds all pending import/export objects to the internal queue.
668

669
    """
670
    assert compat.all(diskie not in self._queue and diskie.loop == self
671
                      for diskie in self._pending_add)
672

    
673
    self._queue.extend(self._pending_add)
674

    
675
    del self._pending_add[:]
676

    
677
  def Run(self):
678
    """Utility main loop.
679

680
    """
681
    while True:
682
      self._AddPendingToQueue()
683

    
684
      # Collect all active daemon names
685
      daemons = self._GetActiveDaemonNames(self._queue)
686
      if not daemons:
687
        break
688

    
689
      # Collection daemon status data
690
      data = self._CollectDaemonStatus(self._lu, daemons)
691

    
692
      # Use data
693
      delay = self.MAX_DELAY
694
      for diskie in self._queue:
695
        if not diskie.active:
696
          continue
697

    
698
        try:
699
          try:
700
            all_daemon_data = data[diskie.node_name]
701
          except KeyError:
702
            result = diskie.SetDaemonData(False, None)
703
          else:
704
            result = \
705
              diskie.SetDaemonData(True,
706
                                   all_daemon_data[diskie.GetDaemonName()])
707

    
708
          if not result:
709
            # Daemon not yet ready, retry soon
710
            delay = min(3.0, delay)
711
            continue
712

    
713
          if diskie.CheckFinished():
714
            # Transfer finished
715
            diskie.Finalize()
716
            continue
717

    
718
          # Normal case: check again in 5 seconds
719
          delay = min(5.0, delay)
720

    
721
          if not diskie.CheckListening():
722
            # Not yet listening, retry soon
723
            delay = min(1.0, delay)
724
            continue
725

    
726
          if not diskie.CheckConnected():
727
            # Not yet connected, retry soon
728
            delay = min(1.0, delay)
729
            continue
730

    
731
        except _ImportExportError, err:
732
          logging.exception("%s failed", diskie.MODE_TEXT)
733
          diskie.Finalize(error=str(err))
734

    
735
      if not compat.any([diskie.active for diskie in self._queue]):
736
        break
737

    
738
      # Wait a bit
739
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
740
      logging.debug("Waiting for %ss", delay)
741
      time.sleep(delay)
742

    
743
  def FinalizeAll(self):
744
    """Finalizes all pending transfers.
745

746
    """
747
    success = True
748

    
749
    for diskie in self._queue:
750
      success = diskie.Finalize() and success
751

    
752
    return success
753

    
754

    
755
class _TransferInstCbBase(ImportExportCbBase):
756
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
757
               dest_node, dest_ip):
758
    """Initializes this class.
759

760
    """
761
    ImportExportCbBase.__init__(self)
762

    
763
    self.lu = lu
764
    self.feedback_fn = feedback_fn
765
    self.instance = instance
766
    self.timeouts = timeouts
767
    self.src_node = src_node
768
    self.src_cbs = src_cbs
769
    self.dest_node = dest_node
770
    self.dest_ip = dest_ip
771

    
772

    
773
class _TransferInstSourceCb(_TransferInstCbBase):
774
  def ReportConnected(self, ie, dtp):
775
    """Called when a connection has been established.
776

777
    """
778
    assert self.src_cbs is None
779
    assert dtp.src_export == ie
780
    assert dtp.dest_import
781

    
782
    self.feedback_fn("%s is sending data on %s" %
783
                     (dtp.data.name, ie.node_name))
784

    
785
  def ReportFinished(self, ie, dtp):
786
    """Called when a transfer has finished.
787

788
    """
789
    assert self.src_cbs is None
790
    assert dtp.src_export == ie
791
    assert dtp.dest_import
792

    
793
    if ie.success:
794
      self.feedback_fn("%s finished sending data" % dtp.data.name)
795
    else:
796
      self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
797
                       (dtp.data.name, ie.final_message, ie.recent_output))
798

    
799
    dtp.RecordResult(ie.success)
800

    
801
    cb = dtp.data.finished_fn
802
    if cb:
803
      cb()
804

    
805
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
806
    # give the daemon a moment to sort things out
807
    if dtp.dest_import and not ie.success:
808
      dtp.dest_import.Abort()
809

    
810

    
811
class _TransferInstDestCb(_TransferInstCbBase):
812
  def ReportListening(self, ie, dtp):
813
    """Called when daemon started listening.
814

815
    """
816
    assert self.src_cbs
817
    assert dtp.src_export is None
818
    assert dtp.dest_import
819

    
820
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
821

    
822
    # Start export on source node
823
    de = DiskExport(self.lu, self.src_node, None, None, self.dest_ip,
824
                    ie.listen_port, self.instance,
825
                    dtp.data.src_io, dtp.data.src_ioargs,
826
                    self.timeouts, self.src_cbs, private=dtp)
827
    ie.loop.Add(de)
828

    
829
    dtp.src_export = de
830

    
831
  def ReportConnected(self, ie, dtp):
832
    """Called when a connection has been established.
833

834
    """
835
    self.feedback_fn("%s is receiving data on %s" %
836
                     (dtp.data.name, self.dest_node))
837

    
838
  def ReportFinished(self, ie, dtp):
839
    """Called when a transfer has finished.
840

841
    """
842
    if ie.success:
843
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
844
    else:
845
      self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
846
                       (dtp.data.name, ie.final_message, ie.recent_output))
847

    
848
    dtp.RecordResult(ie.success)
849

    
850
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
851
    # give the daemon a moment to sort things out
852
    if dtp.src_export and not ie.success:
853
      dtp.src_export.Abort()
854

    
855

    
856
class DiskTransfer(object):
857
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
858
               finished_fn):
859
    """Initializes this class.
860

861
    @type name: string
862
    @param name: User-visible name for this transfer (e.g. "disk/0")
863
    @param src_io: Source I/O type
864
    @param src_ioargs: Source I/O arguments
865
    @param dest_io: Destination I/O type
866
    @param dest_ioargs: Destination I/O arguments
867
    @type finished_fn: callable
868
    @param finished_fn: Function called once transfer has finished
869

870
    """
871
    self.name = name
872

    
873
    self.src_io = src_io
874
    self.src_ioargs = src_ioargs
875

    
876
    self.dest_io = dest_io
877
    self.dest_ioargs = dest_ioargs
878

    
879
    self.finished_fn = finished_fn
880

    
881

    
882
class _DiskTransferPrivate(object):
883
  def __init__(self, data, success):
884
    """Initializes this class.
885

886
    @type data: L{DiskTransfer}
887
    @type success: bool
888

889
    """
890
    self.data = data
891

    
892
    self.src_export = None
893
    self.dest_import = None
894

    
895
    self.success = success
896

    
897
  def RecordResult(self, success):
898
    """Updates the status.
899

900
    One failed part will cause the whole transfer to fail.
901

902
    """
903
    self.success = self.success and success
904

    
905

    
906
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
907
                         instance, all_transfers):
908
  """Transfers an instance's data from one node to another.
909

910
  @param lu: Logical unit instance
911
  @param feedback_fn: Feedback function
912
  @type src_node: string
913
  @param src_node: Source node name
914
  @type dest_node: string
915
  @param dest_node: Destination node name
916
  @type dest_ip: string
917
  @param dest_ip: IP address of destination node
918
  @type instance: L{objects.Instance}
919
  @param instance: Instance object
920
  @type all_transfers: list of L{DiskTransfer} instances
921
  @param all_transfers: List of all disk transfers to be made
922
  @rtype: list
923
  @return: List with a boolean (True=successful, False=failed) for success for
924
           each transfer
925

926
  """
927
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
928
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
929
                                  src_node, None, dest_node, dest_ip)
930
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
931
                                 src_node, src_cbs, dest_node, dest_ip)
932

    
933
  all_dtp = []
934

    
935
  ieloop = ImportExportLoop(lu)
936
  try:
937
    for transfer in all_transfers:
938
      if transfer:
939
        feedback_fn("Exporting %s from %s to %s" %
940
                    (transfer.name, src_node, dest_node))
941

    
942
        dtp = _DiskTransferPrivate(transfer, True)
943

    
944
        di = DiskImport(lu, dest_node, None, None, instance,
945
                        transfer.dest_io, transfer.dest_ioargs,
946
                        timeouts, dest_cbs, private=dtp)
947
        ieloop.Add(di)
948

    
949
        dtp.dest_import = di
950
      else:
951
        dtp = _DiskTransferPrivate(None, False)
952

    
953
      all_dtp.append(dtp)
954

    
955
    ieloop.Run()
956
  finally:
957
    ieloop.FinalizeAll()
958

    
959
  assert len(all_dtp) == len(all_transfers)
960
  assert compat.all([(dtp.src_export is None or
961
                      dtp.src_export.success is not None) and
962
                     (dtp.dest_import is None or
963
                      dtp.dest_import.success is not None)
964
                     for dtp in all_dtp]), \
965
         "Not all imports/exports are finalized"
966

    
967
  return [bool(dtp.success) for dtp in all_dtp]
968

    
969

    
970
class _RemoteExportCb(ImportExportCbBase):
971
  def __init__(self, feedback_fn, disk_count):
972
    """Initializes this class.
973

974
    """
975
    ImportExportCbBase.__init__(self)
976
    self._feedback_fn = feedback_fn
977
    self._dresults = [None] * disk_count
978

    
979
  @property
980
  def disk_results(self):
981
    """Returns per-disk results.
982

983
    """
984
    return self._dresults
985

    
986
  def ReportConnected(self, ie, private):
987
    """Called when a connection has been established.
988

989
    """
990
    (idx, _) = private
991

    
992
    self._feedback_fn("Disk %s is now sending data" % idx)
993

    
994
  def ReportFinished(self, ie, private):
995
    """Called when a transfer has finished.
996

997
    """
998
    (idx, finished_fn) = private
999

    
1000
    if ie.success:
1001
      self._feedback_fn("Disk %s finished sending data" % idx)
1002
    else:
1003
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" %
1004
                        (idx, ie.final_message, ie.recent_output))
1005

    
1006
    self._dresults[idx] = bool(ie.success)
1007

    
1008
    if finished_fn:
1009
      finished_fn()
1010

    
1011

    
1012
class ExportInstanceHelper:
1013
  def __init__(self, lu, feedback_fn, instance):
1014
    """Initializes this class.
1015

1016
    @param lu: Logical unit instance
1017
    @param feedback_fn: Feedback function
1018
    @type instance: L{objects.Instance}
1019
    @param instance: Instance object
1020

1021
    """
1022
    self._lu = lu
1023
    self._feedback_fn = feedback_fn
1024
    self._instance = instance
1025

    
1026
    self._snap_disks = []
1027
    self._removed_snaps = [False] * len(instance.disks)
1028

    
1029
  def CreateSnapshots(self):
1030
    """Creates an LVM snapshot for every disk of the instance.
1031

1032
    """
1033
    assert not self._snap_disks
1034

    
1035
    instance = self._instance
1036
    src_node = instance.primary_node
1037

    
1038
    vgname = self._lu.cfg.GetVGName()
1039

    
1040
    for idx, disk in enumerate(instance.disks):
1041
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1042
                        (idx, src_node))
1043

    
1044
      # result.payload will be a snapshot of an lvm leaf of the one we
1045
      # passed
1046
      result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1047
      msg = result.fail_msg
1048
      if msg:
1049
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1050
                            idx, src_node, msg)
1051
        new_dev = False
1052
      else:
1053
        disk_id = (vgname, result.payload)
1054
        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1055
                               logical_id=disk_id, physical_id=disk_id,
1056
                               iv_name=disk.iv_name)
1057

    
1058
      self._snap_disks.append(new_dev)
1059

    
1060
    assert len(self._snap_disks) == len(instance.disks)
1061
    assert len(self._removed_snaps) == len(instance.disks)
1062

    
1063
  def _RemoveSnapshot(self, disk_index):
1064
    """Removes an LVM snapshot.
1065

1066
    @type disk_index: number
1067
    @param disk_index: Index of the snapshot to be removed
1068

1069
    """
1070
    disk = self._snap_disks[disk_index]
1071
    if disk and not self._removed_snaps[disk_index]:
1072
      src_node = self._instance.primary_node
1073

    
1074
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1075
                        (disk_index, src_node))
1076

    
1077
      result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1078
      if result.fail_msg:
1079
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1080
                            " %s: %s", disk_index, src_node, result.fail_msg)
1081
      else:
1082
        self._removed_snaps[disk_index] = True
1083

    
1084
  def LocalExport(self, dest_node):
1085
    """Intra-cluster instance export.
1086

1087
    @type dest_node: L{objects.Node}
1088
    @param dest_node: Destination node
1089

1090
    """
1091
    instance = self._instance
1092
    src_node = instance.primary_node
1093

    
1094
    assert len(self._snap_disks) == len(instance.disks)
1095

    
1096
    transfers = []
1097

    
1098
    for idx, dev in enumerate(self._snap_disks):
1099
      if not dev:
1100
        transfers.append(None)
1101
        continue
1102

    
1103
      path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1104
                            dev.physical_id[1])
1105

    
1106
      finished_fn = compat.partial(self._TransferFinished, idx)
1107

    
1108
      # FIXME: pass debug option from opcode to backend
1109
      dt = DiskTransfer("snapshot/%s" % idx,
1110
                        constants.IEIO_SCRIPT, (dev, idx),
1111
                        constants.IEIO_FILE, (path, ),
1112
                        finished_fn)
1113
      transfers.append(dt)
1114

    
1115
    # Actually export data
1116
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1117
                                    src_node, dest_node.name,
1118
                                    dest_node.secondary_ip,
1119
                                    instance, transfers)
1120

    
1121
    assert len(dresults) == len(instance.disks)
1122

    
1123
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1124
    result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1125
                                               self._snap_disks)
1126
    msg = result.fail_msg
1127
    fin_resu = not msg
1128
    if msg:
1129
      self._lu.LogWarning("Could not finalize export for instance %s"
1130
                          " on node %s: %s", instance.name, dest_node.name, msg)
1131

    
1132
    return (fin_resu, dresults)
1133

    
1134
  def RemoteExport(self, x509_key_name, dest_x509_ca, disk_info, timeouts):
1135
    """Inter-cluster instance export.
1136

1137
    @type x509_key_name: string
1138
    @param x509_key_name: X509 key name for encrypting data
1139
    @type dest_x509_ca: OpenSSL.crypto.X509
1140
    @param dest_x509_ca: Remote peer X509 CA object
1141
    @type disk_info: list
1142
    @param disk_info: Per-disk destination information
1143
    @type timeouts: L{ImportExportTimeouts}
1144
    @param timeouts: Timeouts for this import
1145

1146
    """
1147
    instance = self._instance
1148

    
1149
    assert len(disk_info) == len(instance.disks)
1150

    
1151
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1152

    
1153
    dest_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1154
                                                  dest_x509_ca)
1155

    
1156
    ieloop = ImportExportLoop(self._lu)
1157
    try:
1158
      for idx, (dev, (host, port, _, _)) in enumerate(zip(instance.disks,
1159
                                                          disk_info)):
1160
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1161
        finished_fn = compat.partial(self._TransferFinished, idx)
1162
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1163
                              x509_key_name, dest_ca_pem, host, port, instance,
1164
                              constants.IEIO_SCRIPT, (dev, idx),
1165
                              timeouts, cbs, private=(idx, finished_fn)))
1166

    
1167
      ieloop.Run()
1168
    finally:
1169
      ieloop.FinalizeAll()
1170

    
1171
    return (True, cbs.disk_results)
1172

    
1173
  def _TransferFinished(self, idx):
1174
    """Called once a transfer has finished.
1175

1176
    @type idx: number
1177
    @param idx: Disk index
1178

1179
    """
1180
    logging.debug("Transfer %s finished", idx)
1181
    self._RemoveSnapshot(idx)
1182

    
1183
  def Cleanup(self):
1184
    """Remove all snapshots.
1185

1186
    """
1187
    assert len(self._removed_snaps) == len(self._instance.disks)
1188
    for idx in range(len(self._instance.disks)):
1189
      self._RemoveSnapshot(idx)
1190

    
1191

    
1192
def _GetImportExportHandshakeMessage(version):
1193
  """Returns the handshake message for a RIE protocol version.
1194

1195
  @type version: number
1196

1197
  """
1198
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1199

    
1200

    
1201
def ComputeRemoteExportHandshake(cds):
1202
  """Computes the remote import/export handshake.
1203

1204
  @type cds: string
1205
  @param cds: Cluster domain secret
1206

1207
  """
1208
  salt = utils.GenerateSecret(8)
1209
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1210
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1211

    
1212

    
1213
def CheckRemoteExportHandshake(cds, handshake):
1214
  """Checks the handshake of a remote import/export.
1215

1216
  @type cds: string
1217
  @param cds: Cluster domain secret
1218
  @type handshake: sequence
1219
  @param handshake: Handshake sent by remote peer
1220

1221
  """
1222
  try:
1223
    (version, hmac_digest, hmac_salt) = handshake
1224
  except (TypeError, ValueError), err:
1225
    return "Invalid data: %s" % err
1226

    
1227
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1228
                              hmac_digest, salt=hmac_salt):
1229
    return "Hash didn't match, clusters don't share the same domain secret"
1230

    
1231
  if version != constants.RIE_VERSION:
1232
    return ("Clusters don't have the same remote import/export protocol"
1233
            " (local=%s, remote=%s)" %
1234
            (constants.RIE_VERSION, version))
1235

    
1236
  return None
1237

    
1238

    
1239
def _GetRieDiskInfoMessage(disk_index, host, port):
1240
  """Returns the hashed text for import/export disk information.
1241

1242
  @type disk_index: number
1243
  @param disk_index: Index of disk (included in hash)
1244
  @type host: string
1245
  @param host: Hostname
1246
  @type port: number
1247
  @param port: Daemon port
1248

1249
  """
1250
  return "%s:%s:%s" % (disk_index, host, port)
1251

    
1252

    
1253
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1254
  """Verifies received disk information for an export.
1255

1256
  @type cds: string
1257
  @param cds: Cluster domain secret
1258
  @type disk_index: number
1259
  @param disk_index: Index of disk (included in hash)
1260
  @type disk_info: sequence
1261
  @param disk_info: Disk information sent by remote peer
1262

1263
  """
1264
  try:
1265
    (host, port, hmac_digest, hmac_salt) = disk_info
1266
  except (TypeError, ValueError), err:
1267
    raise errors.GenericError("Invalid data: %s" % err)
1268

    
1269
  if not (host and port):
1270
    raise errors.GenericError("Missing destination host or port")
1271

    
1272
  msg = _GetRieDiskInfoMessage(disk_index, host, port)
1273

    
1274
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1275
    raise errors.GenericError("HMAC is wrong")
1276

    
1277
  return (host, port)
1278

    
1279

    
1280
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port):
1281
  """Computes the signed disk information for a remote import.
1282

1283
  @type cds: string
1284
  @param cds: Cluster domain secret
1285
  @type salt: string
1286
  @param salt: HMAC salt
1287
  @type disk_index: number
1288
  @param disk_index: Index of disk (included in hash)
1289
  @type host: string
1290
  @param host: Hostname
1291
  @type port: number
1292
  @param port: Daemon port
1293

1294
  """
1295
  msg = _GetRieDiskInfoMessage(disk_index, host, port)
1296
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1297
  return (host, port, hmac_digest, salt)