Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ eb630f50

History | View | Annotate | Download (39.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Instance-related functions and classes for masterd.
23

24
"""
25

    
26
import logging
27
import time
28
import OpenSSL
29

    
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import compat
33
from ganeti import utils
34
from ganeti import objects
35

    
36

    
37
class _ImportExportError(Exception):
38
  """Local exception to report import/export errors.
39

40
  """
41

    
42

    
43
class ImportExportTimeouts(object):
44
  #: Time until daemon starts writing status file
45
  DEFAULT_READY_TIMEOUT = 10
46

    
47
  #: Length of time until errors cause hard failure
48
  DEFAULT_ERROR_TIMEOUT = 10
49

    
50
  #: Time after which daemon must be listening
51
  DEFAULT_LISTEN_TIMEOUT = 10
52

    
53
  __slots__ = [
54
    "error",
55
    "ready",
56
    "listen",
57
    "connect",
58
    ]
59

    
60
  def __init__(self, connect,
61
               listen=DEFAULT_LISTEN_TIMEOUT,
62
               error=DEFAULT_ERROR_TIMEOUT,
63
               ready=DEFAULT_READY_TIMEOUT):
64
    """Initializes this class.
65

66
    @type connect: number
67
    @param connect: Timeout for establishing connection
68
    @type listen: number
69
    @param listen: Timeout for starting to listen for connections
70
    @type error: number
71
    @param error: Length of time until errors cause hard failure
72
    @type ready: number
73
    @param ready: Timeout for daemon to become ready
74

75
    """
76
    self.error = error
77
    self.ready = ready
78
    self.listen = listen
79
    self.connect = connect
80

    
81

    
82
class ImportExportCbBase(object):
83
  """Callbacks for disk import/export.
84

85
  """
86
  def ReportListening(self, ie, private):
87
    """Called when daemon started listening.
88

89
    @type ie: Subclass of L{_DiskImportExportBase}
90
    @param ie: Import/export object
91
    @param private: Private data passed to import/export object
92

93
    """
94

    
95
  def ReportConnected(self, ie, private):
96
    """Called when a connection has been established.
97

98
    @type ie: Subclass of L{_DiskImportExportBase}
99
    @param ie: Import/export object
100
    @param private: Private data passed to import/export object
101

102
    """
103

    
104
  def ReportFinished(self, ie, private):
105
    """Called when a transfer has finished.
106

107
    @type ie: Subclass of L{_DiskImportExportBase}
108
    @param ie: Import/export object
109
    @param private: Private data passed to import/export object
110

111
    """
112

    
113

    
114
def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
115
  """Checks whether a timeout has expired.
116

117
  """
118
  return _time_fn() > (epoch + timeout)
119

    
120

    
121
class _DiskImportExportBase(object):
122
  MODE_TEXT = None
123

    
124
  def __init__(self, lu, node_name, opts,
125
               instance, timeouts, cbs, private=None):
126
    """Initializes this class.
127

128
    @param lu: Logical unit instance
129
    @type node_name: string
130
    @param node_name: Node name for import
131
    @type opts: L{objects.ImportExportOptions}
132
    @param opts: Import/export daemon options
133
    @type instance: L{objects.Instance}
134
    @param instance: Instance object
135
    @type timeouts: L{ImportExportTimeouts}
136
    @param timeouts: Timeouts for this import
137
    @type cbs: L{ImportExportCbBase}
138
    @param cbs: Callbacks
139
    @param private: Private data for callback functions
140

141
    """
142
    assert self.MODE_TEXT
143

    
144
    self._lu = lu
145
    self.node_name = node_name
146
    self._opts = opts
147
    self._instance = instance
148
    self._timeouts = timeouts
149
    self._cbs = cbs
150
    self._private = private
151

    
152
    # Parent loop
153
    self._loop = None
154

    
155
    # Timestamps
156
    self._ts_begin = None
157
    self._ts_connected = None
158
    self._ts_finished = None
159
    self._ts_cleanup = None
160
    self._ts_last_error = None
161

    
162
    # Transfer status
163
    self.success = None
164
    self.final_message = None
165

    
166
    # Daemon status
167
    self._daemon_name = None
168
    self._daemon = None
169

    
170
  @property
171
  def recent_output(self):
172
    """Returns the most recent output from the daemon.
173

174
    """
175
    if self._daemon:
176
      return self._daemon.recent_output
177

    
178
    return None
179

    
180
  @property
181
  def active(self):
182
    """Determines whether this transport is still active.
183

184
    """
185
    return self.success is None
186

    
187
  @property
188
  def loop(self):
189
    """Returns parent loop.
190

191
    @rtype: L{ImportExportLoop}
192

193
    """
194
    return self._loop
195

    
196
  def SetLoop(self, loop):
197
    """Sets the parent loop.
198

199
    @type loop: L{ImportExportLoop}
200

201
    """
202
    if self._loop:
203
      raise errors.ProgrammerError("Loop can only be set once")
204

    
205
    self._loop = loop
206

    
207
  def _StartDaemon(self):
208
    """Starts the import/export daemon.
209

210
    """
211
    raise NotImplementedError()
212

    
213
  def CheckDaemon(self):
214
    """Checks whether daemon has been started and if not, starts it.
215

216
    @rtype: string
217
    @return: Daemon name
218

219
    """
220
    assert self._ts_cleanup is None
221

    
222
    if self._daemon_name is None:
223
      assert self._ts_begin is None
224

    
225
      result = self._StartDaemon()
226
      if result.fail_msg:
227
        raise _ImportExportError("Failed to start %s on %s: %s" %
228
                                 (self.MODE_TEXT, self.node_name,
229
                                  result.fail_msg))
230

    
231
      daemon_name = result.payload
232

    
233
      logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name,
234
                   self.node_name)
235

    
236
      self._ts_begin = time.time()
237
      self._daemon_name = daemon_name
238

    
239
    return self._daemon_name
240

    
241
  def GetDaemonName(self):
242
    """Returns the daemon name.
243

244
    """
245
    assert self._daemon_name, "Daemon has not been started"
246
    assert self._ts_cleanup is None
247
    return self._daemon_name
248

    
249
  def Abort(self):
250
    """Sends SIGTERM to import/export daemon (if still active).
251

252
    """
253
    if self._daemon_name:
254
      self._lu.LogWarning("Aborting %s %r on %s",
255
                          self.MODE_TEXT, self._daemon_name, self.node_name)
256
      result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
257
      if result.fail_msg:
258
        self._lu.LogWarning("Failed to abort %s %r on %s: %s",
259
                            self.MODE_TEXT, self._daemon_name,
260
                            self.node_name, result.fail_msg)
261
        return False
262

    
263
    return True
264

    
265
  def _SetDaemonData(self, data):
266
    """Internal function for updating status daemon data.
267

268
    @type data: L{objects.ImportExportStatus}
269
    @param data: Daemon status data
270

271
    """
272
    assert self._ts_begin is not None
273

    
274
    if not data:
275
      if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
276
        raise _ImportExportError("Didn't become ready after %s seconds" %
277
                                 self._timeouts.ready)
278

    
279
      return False
280

    
281
    self._daemon = data
282

    
283
    return True
284

    
285
  def SetDaemonData(self, success, data):
286
    """Updates daemon status data.
287

288
    @type success: bool
289
    @param success: Whether fetching data was successful or not
290
    @type data: L{objects.ImportExportStatus}
291
    @param data: Daemon status data
292

293
    """
294
    if not success:
295
      if self._ts_last_error is None:
296
        self._ts_last_error = time.time()
297

    
298
      elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
299
        raise _ImportExportError("Too many errors while updating data")
300

    
301
      return False
302

    
303
    self._ts_last_error = None
304

    
305
    return self._SetDaemonData(data)
306

    
307
  def CheckListening(self):
308
    """Checks whether the daemon is listening.
309

310
    """
311
    raise NotImplementedError()
312

    
313
  def _GetConnectedCheckEpoch(self):
314
    """Returns timeout to calculate connect timeout.
315

316
    """
317
    raise NotImplementedError()
318

    
319
  def CheckConnected(self):
320
    """Checks whether the daemon is connected.
321

322
    @rtype: bool
323
    @return: Whether the daemon is connected
324

325
    """
326
    assert self._daemon, "Daemon status missing"
327

    
328
    if self._ts_connected is not None:
329
      return True
330

    
331
    if self._daemon.connected:
332
      self._ts_connected = time.time()
333

    
334
      # TODO: Log remote peer
335
      logging.debug("%s %r on %s is now connected",
336
                    self.MODE_TEXT, self._daemon_name, self.node_name)
337

    
338
      self._cbs.ReportConnected(self, self._private)
339

    
340
      return True
341

    
342
    if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
343
      raise _ImportExportError("Not connected after %s seconds" %
344
                               self._timeouts.connect)
345

    
346
    return False
347

    
348
  def CheckFinished(self):
349
    """Checks whether the daemon exited.
350

351
    @rtype: bool
352
    @return: Whether the transfer is finished
353

354
    """
355
    assert self._daemon, "Daemon status missing"
356

    
357
    if self._ts_finished:
358
      return True
359

    
360
    if self._daemon.exit_status is None:
361
      return False
362

    
363
    self._ts_finished = time.time()
364

    
365
    self._ReportFinished(self._daemon.exit_status == 0,
366
                         self._daemon.error_message)
367

    
368
    return True
369

    
370
  def _ReportFinished(self, success, message):
371
    """Transfer is finished or daemon exited.
372

373
    @type success: bool
374
    @param success: Whether the transfer was successful
375
    @type message: string
376
    @param message: Error message
377

378
    """
379
    assert self.success is None
380

    
381
    self.success = success
382
    self.final_message = message
383

    
384
    if success:
385
      logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name,
386
                   self.node_name)
387
    elif self._daemon_name:
388
      self._lu.LogWarning("%s %r on %s failed: %s",
389
                          self.MODE_TEXT, self._daemon_name, self.node_name,
390
                          message)
391
    else:
392
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
393
                          self.node_name, message)
394

    
395
    self._cbs.ReportFinished(self, self._private)
396

    
397
  def _Finalize(self):
398
    """Makes the RPC call to finalize this import/export.
399

400
    """
401
    return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
402

    
403
  def Finalize(self, error=None):
404
    """Finalizes this import/export.
405

406
    """
407
    assert error or self.success is not None
408

    
409
    if self._daemon_name:
410
      logging.info("Finalizing %s %r on %s",
411
                   self.MODE_TEXT, self._daemon_name, self.node_name)
412

    
413
      result = self._Finalize()
414
      if result.fail_msg:
415
        self._lu.LogWarning("Failed to finalize %s %r on %s: %s",
416
                            self.MODE_TEXT, self._daemon_name,
417
                            self.node_name, result.fail_msg)
418
        return False
419

    
420
      # Daemon is no longer running
421
      self._daemon_name = None
422
      self._ts_cleanup = time.time()
423

    
424
    if error:
425
      self._ReportFinished(False, error)
426

    
427
    return True
428

    
429

    
430
class DiskImport(_DiskImportExportBase):
431
  MODE_TEXT = "import"
432

    
433
  def __init__(self, lu, node_name, opts, instance,
434
               dest, dest_args, timeouts, cbs, private=None):
435
    """Initializes this class.
436

437
    @param lu: Logical unit instance
438
    @type node_name: string
439
    @param node_name: Node name for import
440
    @type opts: L{objects.ImportExportOptions}
441
    @param opts: Import/export daemon options
442
    @type instance: L{objects.Instance}
443
    @param instance: Instance object
444
    @param dest: I/O destination
445
    @param dest_args: I/O arguments
446
    @type timeouts: L{ImportExportTimeouts}
447
    @param timeouts: Timeouts for this import
448
    @type cbs: L{ImportExportCbBase}
449
    @param cbs: Callbacks
450
    @param private: Private data for callback functions
451

452
    """
453
    _DiskImportExportBase.__init__(self, lu, node_name, opts,
454
                                   instance, timeouts, cbs, private)
455
    self._dest = dest
456
    self._dest_args = dest_args
457

    
458
    # Timestamps
459
    self._ts_listening = None
460

    
461
  @property
462
  def listen_port(self):
463
    """Returns the port the daemon is listening on.
464

465
    """
466
    if self._daemon:
467
      return self._daemon.listen_port
468

    
469
    return None
470

    
471
  def _StartDaemon(self):
472
    """Starts the import daemon.
473

474
    """
475
    return self._lu.rpc.call_import_start(self.node_name, self._opts,
476
                                          self._instance,
477
                                          self._dest, self._dest_args)
478

    
479
  def CheckListening(self):
480
    """Checks whether the daemon is listening.
481

482
    @rtype: bool
483
    @return: Whether the daemon is listening
484

485
    """
486
    assert self._daemon, "Daemon status missing"
487

    
488
    if self._ts_listening is not None:
489
      return True
490

    
491
    port = self._daemon.listen_port
492
    if port is not None:
493
      self._ts_listening = time.time()
494

    
495
      logging.debug("Import %r on %s is now listening on port %s",
496
                    self._daemon_name, self.node_name, port)
497

    
498
      self._cbs.ReportListening(self, self._private)
499

    
500
      return True
501

    
502
    if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
503
      raise _ImportExportError("Not listening after %s seconds" %
504
                               self._timeouts.listen)
505

    
506
    return False
507

    
508
  def _GetConnectedCheckEpoch(self):
509
    """Returns the time since we started listening.
510

511
    """
512
    assert self._ts_listening is not None, \
513
           ("Checking whether an import is connected is only useful"
514
            " once it's been listening")
515

    
516
    return self._ts_listening
517

    
518

    
519
class DiskExport(_DiskImportExportBase):
520
  MODE_TEXT = "export"
521

    
522
  def __init__(self, lu, node_name, opts,
523
               dest_host, dest_port, instance, source, source_args,
524
               timeouts, cbs, private=None):
525
    """Initializes this class.
526

527
    @param lu: Logical unit instance
528
    @type node_name: string
529
    @param node_name: Node name for import
530
    @type opts: L{objects.ImportExportOptions}
531
    @param opts: Import/export daemon options
532
    @type dest_host: string
533
    @param dest_host: Destination host name or IP address
534
    @type dest_port: number
535
    @param dest_port: Destination port number
536
    @type instance: L{objects.Instance}
537
    @param instance: Instance object
538
    @param source: I/O source
539
    @param source_args: I/O source
540
    @type timeouts: L{ImportExportTimeouts}
541
    @param timeouts: Timeouts for this import
542
    @type cbs: L{ImportExportCbBase}
543
    @param cbs: Callbacks
544
    @param private: Private data for callback functions
545

546
    """
547
    _DiskImportExportBase.__init__(self, lu, node_name, opts,
548
                                   instance, timeouts, cbs, private)
549
    self._dest_host = dest_host
550
    self._dest_port = dest_port
551
    self._source = source
552
    self._source_args = source_args
553

    
554
  def _StartDaemon(self):
555
    """Starts the export daemon.
556

557
    """
558
    return self._lu.rpc.call_export_start(self.node_name, self._opts,
559
                                          self._dest_host, self._dest_port,
560
                                          self._instance, self._source,
561
                                          self._source_args)
562

    
563
  def CheckListening(self):
564
    """Checks whether the daemon is listening.
565

566
    """
567
    # Only an import can be listening
568
    return True
569

    
570
  def _GetConnectedCheckEpoch(self):
571
    """Returns the time since the daemon started.
572

573
    """
574
    assert self._ts_begin is not None
575

    
576
    return self._ts_begin
577

    
578

    
579
class ImportExportLoop:
580
  MIN_DELAY = 1.0
581
  MAX_DELAY = 20.0
582

    
583
  def __init__(self, lu):
584
    """Initializes this class.
585

586
    """
587
    self._lu = lu
588
    self._queue = []
589
    self._pending_add = []
590

    
591
  def Add(self, diskie):
592
    """Adds an import/export object to the loop.
593

594
    @type diskie: Subclass of L{_DiskImportExportBase}
595
    @param diskie: Import/export object
596

597
    """
598
    assert diskie not in self._pending_add
599
    assert diskie.loop is None
600

    
601
    diskie.SetLoop(self)
602

    
603
    # Adding new objects to a staging list is necessary, otherwise the main
604
    # loop gets confused if callbacks modify the queue while the main loop is
605
    # iterating over it.
606
    self._pending_add.append(diskie)
607

    
608
  @staticmethod
609
  def _CollectDaemonStatus(lu, daemons):
610
    """Collects the status for all import/export daemons.
611

612
    """
613
    daemon_status = {}
614

    
615
    for node_name, names in daemons.iteritems():
616
      result = lu.rpc.call_impexp_status(node_name, names)
617
      if result.fail_msg:
618
        lu.LogWarning("Failed to get daemon status on %s: %s",
619
                      node_name, result.fail_msg)
620
        continue
621

    
622
      assert len(names) == len(result.payload)
623

    
624
      daemon_status[node_name] = dict(zip(names, result.payload))
625

    
626
    return daemon_status
627

    
628
  @staticmethod
629
  def _GetActiveDaemonNames(queue):
630
    """Gets the names of all active daemons.
631

632
    """
633
    result = {}
634
    for diskie in queue:
635
      if not diskie.active:
636
        continue
637

    
638
      try:
639
        # Start daemon if necessary
640
        daemon_name = diskie.CheckDaemon()
641
      except _ImportExportError, err:
642
        logging.exception("%s failed", diskie.MODE_TEXT)
643
        diskie.Finalize(error=str(err))
644
        continue
645

    
646
      result.setdefault(diskie.node_name, []).append(daemon_name)
647

    
648
    assert len(queue) >= len(result)
649
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
650

    
651
    logging.debug("daemons=%r", result)
652

    
653
    return result
654

    
655
  def _AddPendingToQueue(self):
656
    """Adds all pending import/export objects to the internal queue.
657

658
    """
659
    assert compat.all(diskie not in self._queue and diskie.loop == self
660
                      for diskie in self._pending_add)
661

    
662
    self._queue.extend(self._pending_add)
663

    
664
    del self._pending_add[:]
665

    
666
  def Run(self):
667
    """Utility main loop.
668

669
    """
670
    while True:
671
      self._AddPendingToQueue()
672

    
673
      # Collect all active daemon names
674
      daemons = self._GetActiveDaemonNames(self._queue)
675
      if not daemons:
676
        break
677

    
678
      # Collection daemon status data
679
      data = self._CollectDaemonStatus(self._lu, daemons)
680

    
681
      # Use data
682
      delay = self.MAX_DELAY
683
      for diskie in self._queue:
684
        if not diskie.active:
685
          continue
686

    
687
        try:
688
          try:
689
            all_daemon_data = data[diskie.node_name]
690
          except KeyError:
691
            result = diskie.SetDaemonData(False, None)
692
          else:
693
            result = \
694
              diskie.SetDaemonData(True,
695
                                   all_daemon_data[diskie.GetDaemonName()])
696

    
697
          if not result:
698
            # Daemon not yet ready, retry soon
699
            delay = min(3.0, delay)
700
            continue
701

    
702
          if diskie.CheckFinished():
703
            # Transfer finished
704
            diskie.Finalize()
705
            continue
706

    
707
          # Normal case: check again in 5 seconds
708
          delay = min(5.0, delay)
709

    
710
          if not diskie.CheckListening():
711
            # Not yet listening, retry soon
712
            delay = min(1.0, delay)
713
            continue
714

    
715
          if not diskie.CheckConnected():
716
            # Not yet connected, retry soon
717
            delay = min(1.0, delay)
718
            continue
719

    
720
        except _ImportExportError, err:
721
          logging.exception("%s failed", diskie.MODE_TEXT)
722
          diskie.Finalize(error=str(err))
723

    
724
      if not compat.any([diskie.active for diskie in self._queue]):
725
        break
726

    
727
      # Wait a bit
728
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
729
      logging.debug("Waiting for %ss", delay)
730
      time.sleep(delay)
731

    
732
  def FinalizeAll(self):
733
    """Finalizes all pending transfers.
734

735
    """
736
    success = True
737

    
738
    for diskie in self._queue:
739
      success = diskie.Finalize() and success
740

    
741
    return success
742

    
743

    
744
class _TransferInstCbBase(ImportExportCbBase):
745
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
746
               dest_node, dest_ip):
747
    """Initializes this class.
748

749
    """
750
    ImportExportCbBase.__init__(self)
751

    
752
    self.lu = lu
753
    self.feedback_fn = feedback_fn
754
    self.instance = instance
755
    self.timeouts = timeouts
756
    self.src_node = src_node
757
    self.src_cbs = src_cbs
758
    self.dest_node = dest_node
759
    self.dest_ip = dest_ip
760

    
761

    
762
class _TransferInstSourceCb(_TransferInstCbBase):
763
  def ReportConnected(self, ie, dtp):
764
    """Called when a connection has been established.
765

766
    """
767
    assert self.src_cbs is None
768
    assert dtp.src_export == ie
769
    assert dtp.dest_import
770

    
771
    self.feedback_fn("%s is sending data on %s" %
772
                     (dtp.data.name, ie.node_name))
773

    
774
  def ReportFinished(self, ie, dtp):
775
    """Called when a transfer has finished.
776

777
    """
778
    assert self.src_cbs is None
779
    assert dtp.src_export == ie
780
    assert dtp.dest_import
781

    
782
    if ie.success:
783
      self.feedback_fn("%s finished sending data" % dtp.data.name)
784
    else:
785
      self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
786
                       (dtp.data.name, ie.final_message, ie.recent_output))
787

    
788
    dtp.RecordResult(ie.success)
789

    
790
    cb = dtp.data.finished_fn
791
    if cb:
792
      cb()
793

    
794
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
795
    # give the daemon a moment to sort things out
796
    if dtp.dest_import and not ie.success:
797
      dtp.dest_import.Abort()
798

    
799

    
800
class _TransferInstDestCb(_TransferInstCbBase):
801
  def ReportListening(self, ie, dtp):
802
    """Called when daemon started listening.
803

804
    """
805
    assert self.src_cbs
806
    assert dtp.src_export is None
807
    assert dtp.dest_import
808

    
809
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
810

    
811
    opts = objects.ImportExportOptions(key_name=None, ca_pem=None)
812

    
813
    # Start export on source node
814
    de = DiskExport(self.lu, self.src_node, opts, self.dest_ip, ie.listen_port,
815
                    self.instance, dtp.data.src_io, dtp.data.src_ioargs,
816
                    self.timeouts, self.src_cbs, private=dtp)
817
    ie.loop.Add(de)
818

    
819
    dtp.src_export = de
820

    
821
  def ReportConnected(self, ie, dtp):
822
    """Called when a connection has been established.
823

824
    """
825
    self.feedback_fn("%s is receiving data on %s" %
826
                     (dtp.data.name, self.dest_node))
827

    
828
  def ReportFinished(self, ie, dtp):
829
    """Called when a transfer has finished.
830

831
    """
832
    if ie.success:
833
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
834
    else:
835
      self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
836
                       (dtp.data.name, ie.final_message, ie.recent_output))
837

    
838
    dtp.RecordResult(ie.success)
839

    
840
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
841
    # give the daemon a moment to sort things out
842
    if dtp.src_export and not ie.success:
843
      dtp.src_export.Abort()
844

    
845

    
846
class DiskTransfer(object):
847
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
848
               finished_fn):
849
    """Initializes this class.
850

851
    @type name: string
852
    @param name: User-visible name for this transfer (e.g. "disk/0")
853
    @param src_io: Source I/O type
854
    @param src_ioargs: Source I/O arguments
855
    @param dest_io: Destination I/O type
856
    @param dest_ioargs: Destination I/O arguments
857
    @type finished_fn: callable
858
    @param finished_fn: Function called once transfer has finished
859

860
    """
861
    self.name = name
862

    
863
    self.src_io = src_io
864
    self.src_ioargs = src_ioargs
865

    
866
    self.dest_io = dest_io
867
    self.dest_ioargs = dest_ioargs
868

    
869
    self.finished_fn = finished_fn
870

    
871

    
872
class _DiskTransferPrivate(object):
873
  def __init__(self, data, success):
874
    """Initializes this class.
875

876
    @type data: L{DiskTransfer}
877
    @type success: bool
878

879
    """
880
    self.data = data
881

    
882
    self.src_export = None
883
    self.dest_import = None
884

    
885
    self.success = success
886

    
887
  def RecordResult(self, success):
888
    """Updates the status.
889

890
    One failed part will cause the whole transfer to fail.
891

892
    """
893
    self.success = self.success and success
894

    
895

    
896
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
897
                         instance, all_transfers):
898
  """Transfers an instance's data from one node to another.
899

900
  @param lu: Logical unit instance
901
  @param feedback_fn: Feedback function
902
  @type src_node: string
903
  @param src_node: Source node name
904
  @type dest_node: string
905
  @param dest_node: Destination node name
906
  @type dest_ip: string
907
  @param dest_ip: IP address of destination node
908
  @type instance: L{objects.Instance}
909
  @param instance: Instance object
910
  @type all_transfers: list of L{DiskTransfer} instances
911
  @param all_transfers: List of all disk transfers to be made
912
  @rtype: list
913
  @return: List with a boolean (True=successful, False=failed) for success for
914
           each transfer
915

916
  """
917
  opts = objects.ImportExportOptions(key_name=None, ca_pem=None)
918
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
919
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
920
                                  src_node, None, dest_node, dest_ip)
921
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
922
                                 src_node, src_cbs, dest_node, dest_ip)
923

    
924
  all_dtp = []
925

    
926
  ieloop = ImportExportLoop(lu)
927
  try:
928
    for transfer in all_transfers:
929
      if transfer:
930
        feedback_fn("Exporting %s from %s to %s" %
931
                    (transfer.name, src_node, dest_node))
932

    
933
        dtp = _DiskTransferPrivate(transfer, True)
934

    
935
        di = DiskImport(lu, dest_node, opts, instance,
936
                        transfer.dest_io, transfer.dest_ioargs,
937
                        timeouts, dest_cbs, private=dtp)
938
        ieloop.Add(di)
939

    
940
        dtp.dest_import = di
941
      else:
942
        dtp = _DiskTransferPrivate(None, False)
943

    
944
      all_dtp.append(dtp)
945

    
946
    ieloop.Run()
947
  finally:
948
    ieloop.FinalizeAll()
949

    
950
  assert len(all_dtp) == len(all_transfers)
951
  assert compat.all([(dtp.src_export is None or
952
                      dtp.src_export.success is not None) and
953
                     (dtp.dest_import is None or
954
                      dtp.dest_import.success is not None)
955
                     for dtp in all_dtp]), \
956
         "Not all imports/exports are finalized"
957

    
958
  return [bool(dtp.success) for dtp in all_dtp]
959

    
960

    
961
class _RemoteExportCb(ImportExportCbBase):
962
  def __init__(self, feedback_fn, disk_count):
963
    """Initializes this class.
964

965
    """
966
    ImportExportCbBase.__init__(self)
967
    self._feedback_fn = feedback_fn
968
    self._dresults = [None] * disk_count
969

    
970
  @property
971
  def disk_results(self):
972
    """Returns per-disk results.
973

974
    """
975
    return self._dresults
976

    
977
  def ReportConnected(self, ie, private):
978
    """Called when a connection has been established.
979

980
    """
981
    (idx, _) = private
982

    
983
    self._feedback_fn("Disk %s is now sending data" % idx)
984

    
985
  def ReportFinished(self, ie, private):
986
    """Called when a transfer has finished.
987

988
    """
989
    (idx, finished_fn) = private
990

    
991
    if ie.success:
992
      self._feedback_fn("Disk %s finished sending data" % idx)
993
    else:
994
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" %
995
                        (idx, ie.final_message, ie.recent_output))
996

    
997
    self._dresults[idx] = bool(ie.success)
998

    
999
    if finished_fn:
1000
      finished_fn()
1001

    
1002

    
1003
class ExportInstanceHelper:
1004
  def __init__(self, lu, feedback_fn, instance):
1005
    """Initializes this class.
1006

1007
    @param lu: Logical unit instance
1008
    @param feedback_fn: Feedback function
1009
    @type instance: L{objects.Instance}
1010
    @param instance: Instance object
1011

1012
    """
1013
    self._lu = lu
1014
    self._feedback_fn = feedback_fn
1015
    self._instance = instance
1016

    
1017
    self._snap_disks = []
1018
    self._removed_snaps = [False] * len(instance.disks)
1019

    
1020
  def CreateSnapshots(self):
1021
    """Creates an LVM snapshot for every disk of the instance.
1022

1023
    """
1024
    assert not self._snap_disks
1025

    
1026
    instance = self._instance
1027
    src_node = instance.primary_node
1028

    
1029
    vgname = self._lu.cfg.GetVGName()
1030

    
1031
    for idx, disk in enumerate(instance.disks):
1032
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1033
                        (idx, src_node))
1034

    
1035
      # result.payload will be a snapshot of an lvm leaf of the one we
1036
      # passed
1037
      result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1038
      msg = result.fail_msg
1039
      if msg:
1040
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1041
                            idx, src_node, msg)
1042
        new_dev = False
1043
      else:
1044
        disk_id = (vgname, result.payload)
1045
        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1046
                               logical_id=disk_id, physical_id=disk_id,
1047
                               iv_name=disk.iv_name)
1048

    
1049
      self._snap_disks.append(new_dev)
1050

    
1051
    assert len(self._snap_disks) == len(instance.disks)
1052
    assert len(self._removed_snaps) == len(instance.disks)
1053

    
1054
  def _RemoveSnapshot(self, disk_index):
1055
    """Removes an LVM snapshot.
1056

1057
    @type disk_index: number
1058
    @param disk_index: Index of the snapshot to be removed
1059

1060
    """
1061
    disk = self._snap_disks[disk_index]
1062
    if disk and not self._removed_snaps[disk_index]:
1063
      src_node = self._instance.primary_node
1064

    
1065
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1066
                        (disk_index, src_node))
1067

    
1068
      result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1069
      if result.fail_msg:
1070
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1071
                            " %s: %s", disk_index, src_node, result.fail_msg)
1072
      else:
1073
        self._removed_snaps[disk_index] = True
1074

    
1075
  def LocalExport(self, dest_node):
1076
    """Intra-cluster instance export.
1077

1078
    @type dest_node: L{objects.Node}
1079
    @param dest_node: Destination node
1080

1081
    """
1082
    instance = self._instance
1083
    src_node = instance.primary_node
1084

    
1085
    assert len(self._snap_disks) == len(instance.disks)
1086

    
1087
    transfers = []
1088

    
1089
    for idx, dev in enumerate(self._snap_disks):
1090
      if not dev:
1091
        transfers.append(None)
1092
        continue
1093

    
1094
      path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1095
                            dev.physical_id[1])
1096

    
1097
      finished_fn = compat.partial(self._TransferFinished, idx)
1098

    
1099
      # FIXME: pass debug option from opcode to backend
1100
      dt = DiskTransfer("snapshot/%s" % idx,
1101
                        constants.IEIO_SCRIPT, (dev, idx),
1102
                        constants.IEIO_FILE, (path, ),
1103
                        finished_fn)
1104
      transfers.append(dt)
1105

    
1106
    # Actually export data
1107
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1108
                                    src_node, dest_node.name,
1109
                                    dest_node.secondary_ip,
1110
                                    instance, transfers)
1111

    
1112
    assert len(dresults) == len(instance.disks)
1113

    
1114
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1115
    result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1116
                                               self._snap_disks)
1117
    msg = result.fail_msg
1118
    fin_resu = not msg
1119
    if msg:
1120
      self._lu.LogWarning("Could not finalize export for instance %s"
1121
                          " on node %s: %s", instance.name, dest_node.name, msg)
1122

    
1123
    return (fin_resu, dresults)
1124

    
1125
  def RemoteExport(self, opts, disk_info, timeouts):
1126
    """Inter-cluster instance export.
1127

1128
    @type opts: L{objects.ImportExportOptions}
1129
    @param opts: Import/export daemon options
1130
    @type disk_info: list
1131
    @param disk_info: Per-disk destination information
1132
    @type timeouts: L{ImportExportTimeouts}
1133
    @param timeouts: Timeouts for this import
1134

1135
    """
1136
    instance = self._instance
1137

    
1138
    assert len(disk_info) == len(instance.disks)
1139

    
1140
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1141

    
1142
    ieloop = ImportExportLoop(self._lu)
1143
    try:
1144
      for idx, (dev, (host, port, _, _)) in enumerate(zip(instance.disks,
1145
                                                          disk_info)):
1146
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1147
        finished_fn = compat.partial(self._TransferFinished, idx)
1148
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1149
                              opts, host, port, instance,
1150
                              constants.IEIO_SCRIPT, (dev, idx),
1151
                              timeouts, cbs, private=(idx, finished_fn)))
1152

    
1153
      ieloop.Run()
1154
    finally:
1155
      ieloop.FinalizeAll()
1156

    
1157
    return (True, cbs.disk_results)
1158

    
1159
  def _TransferFinished(self, idx):
1160
    """Called once a transfer has finished.
1161

1162
    @type idx: number
1163
    @param idx: Disk index
1164

1165
    """
1166
    logging.debug("Transfer %s finished", idx)
1167
    self._RemoveSnapshot(idx)
1168

    
1169
  def Cleanup(self):
1170
    """Remove all snapshots.
1171

1172
    """
1173
    assert len(self._removed_snaps) == len(self._instance.disks)
1174
    for idx in range(len(self._instance.disks)):
1175
      self._RemoveSnapshot(idx)
1176

    
1177

    
1178
class _RemoteImportCb(ImportExportCbBase):
1179
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1180
               external_address):
1181
    """Initializes this class.
1182

1183
    @type cds: string
1184
    @param cds: Cluster domain secret
1185
    @type x509_cert_pem: string
1186
    @param x509_cert_pem: CA used for signing import key
1187
    @type disk_count: number
1188
    @param disk_count: Number of disks
1189
    @type external_address: string
1190
    @param external_address: External address of destination node
1191

1192
    """
1193
    ImportExportCbBase.__init__(self)
1194
    self._feedback_fn = feedback_fn
1195
    self._cds = cds
1196
    self._x509_cert_pem = x509_cert_pem
1197
    self._disk_count = disk_count
1198
    self._external_address = external_address
1199

    
1200
    self._dresults = [None] * disk_count
1201
    self._daemon_port = [None] * disk_count
1202

    
1203
    self._salt = utils.GenerateSecret(8)
1204

    
1205
  @property
1206
  def disk_results(self):
1207
    """Returns per-disk results.
1208

1209
    """
1210
    return self._dresults
1211

    
1212
  def _CheckAllListening(self):
1213
    """Checks whether all daemons are listening.
1214

1215
    If all daemons are listening, the information is sent to the client.
1216

1217
    """
1218
    if not compat.all(dp is not None for dp in self._daemon_port):
1219
      return
1220

    
1221
    host = self._external_address
1222

    
1223
    disks = []
1224
    for idx, port in enumerate(self._daemon_port):
1225
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1226
                                               idx, host, port))
1227

    
1228
    assert len(disks) == self._disk_count
1229

    
1230
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1231
      "disks": disks,
1232
      "x509_ca": self._x509_cert_pem,
1233
      })
1234

    
1235
  def ReportListening(self, ie, private):
1236
    """Called when daemon started listening.
1237

1238
    """
1239
    (idx, ) = private
1240

    
1241
    self._feedback_fn("Disk %s is now listening" % idx)
1242

    
1243
    assert self._daemon_port[idx] is None
1244

    
1245
    self._daemon_port[idx] = ie.listen_port
1246

    
1247
    self._CheckAllListening()
1248

    
1249
  def ReportConnected(self, ie, private):
1250
    """Called when a connection has been established.
1251

1252
    """
1253
    (idx, ) = private
1254

    
1255
    self._feedback_fn("Disk %s is now receiving data" % idx)
1256

    
1257
  def ReportFinished(self, ie, private):
1258
    """Called when a transfer has finished.
1259

1260
    """
1261
    (idx, ) = private
1262

    
1263
    # Daemon is certainly no longer listening
1264
    self._daemon_port[idx] = None
1265

    
1266
    if ie.success:
1267
      self._feedback_fn("Disk %s finished receiving data" % idx)
1268
    else:
1269
      self._feedback_fn(("Disk %s failed to receive data: %s"
1270
                         " (recent output: %r)") %
1271
                        (idx, ie.final_message, ie.recent_output))
1272

    
1273
    self._dresults[idx] = bool(ie.success)
1274

    
1275

    
1276
def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts):
1277
  """Imports an instance from another cluster.
1278

1279
  @param lu: Logical unit instance
1280
  @param feedback_fn: Feedback function
1281
  @type instance: L{objects.Instance}
1282
  @param instance: Instance object
1283
  @type source_x509_ca: OpenSSL.crypto.X509
1284
  @param source_x509_ca: Import source's X509 CA
1285
  @type cds: string
1286
  @param cds: Cluster domain secret
1287
  @type timeouts: L{ImportExportTimeouts}
1288
  @param timeouts: Timeouts for this import
1289

1290
  """
1291
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1292
                                                  source_x509_ca)
1293

    
1294
  # Create crypto key
1295
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1296
                                        constants.RIE_CERT_VALIDITY)
1297
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1298

    
1299
  (x509_key_name, x509_cert_pem) = result.payload
1300
  try:
1301
    # Load certificate
1302
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1303
                                                x509_cert_pem)
1304

    
1305
    # Import daemon options
1306
    opts = objects.ImportExportOptions(key_name=x509_key_name,
1307
                                       ca_pem=source_ca_pem)
1308

    
1309
    # Sign certificate
1310
    signed_x509_cert_pem = \
1311
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1312

    
1313
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1314
                          len(instance.disks), instance.primary_node)
1315

    
1316
    ieloop = ImportExportLoop(lu)
1317
    try:
1318
      for idx, dev in enumerate(instance.disks):
1319
        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1320
                              constants.IEIO_SCRIPT, (dev, idx),
1321
                              timeouts, cbs, private=(idx, )))
1322

    
1323
      ieloop.Run()
1324
    finally:
1325
      ieloop.FinalizeAll()
1326
  finally:
1327
    # Remove crypto key and certificate
1328
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1329
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1330

    
1331
  return cbs.disk_results
1332

    
1333

    
1334
def _GetImportExportHandshakeMessage(version):
1335
  """Returns the handshake message for a RIE protocol version.
1336

1337
  @type version: number
1338

1339
  """
1340
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1341

    
1342

    
1343
def ComputeRemoteExportHandshake(cds):
1344
  """Computes the remote import/export handshake.
1345

1346
  @type cds: string
1347
  @param cds: Cluster domain secret
1348

1349
  """
1350
  salt = utils.GenerateSecret(8)
1351
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1352
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1353

    
1354

    
1355
def CheckRemoteExportHandshake(cds, handshake):
1356
  """Checks the handshake of a remote import/export.
1357

1358
  @type cds: string
1359
  @param cds: Cluster domain secret
1360
  @type handshake: sequence
1361
  @param handshake: Handshake sent by remote peer
1362

1363
  """
1364
  try:
1365
    (version, hmac_digest, hmac_salt) = handshake
1366
  except (TypeError, ValueError), err:
1367
    return "Invalid data: %s" % err
1368

    
1369
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1370
                              hmac_digest, salt=hmac_salt):
1371
    return "Hash didn't match, clusters don't share the same domain secret"
1372

    
1373
  if version != constants.RIE_VERSION:
1374
    return ("Clusters don't have the same remote import/export protocol"
1375
            " (local=%s, remote=%s)" %
1376
            (constants.RIE_VERSION, version))
1377

    
1378
  return None
1379

    
1380

    
1381
def _GetRieDiskInfoMessage(disk_index, host, port):
1382
  """Returns the hashed text for import/export disk information.
1383

1384
  @type disk_index: number
1385
  @param disk_index: Index of disk (included in hash)
1386
  @type host: string
1387
  @param host: Hostname
1388
  @type port: number
1389
  @param port: Daemon port
1390

1391
  """
1392
  return "%s:%s:%s" % (disk_index, host, port)
1393

    
1394

    
1395
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1396
  """Verifies received disk information for an export.
1397

1398
  @type cds: string
1399
  @param cds: Cluster domain secret
1400
  @type disk_index: number
1401
  @param disk_index: Index of disk (included in hash)
1402
  @type disk_info: sequence
1403
  @param disk_info: Disk information sent by remote peer
1404

1405
  """
1406
  try:
1407
    (host, port, hmac_digest, hmac_salt) = disk_info
1408
  except (TypeError, ValueError), err:
1409
    raise errors.GenericError("Invalid data: %s" % err)
1410

    
1411
  if not (host and port):
1412
    raise errors.GenericError("Missing destination host or port")
1413

    
1414
  msg = _GetRieDiskInfoMessage(disk_index, host, port)
1415

    
1416
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1417
    raise errors.GenericError("HMAC is wrong")
1418

    
1419
  return (host, port)
1420

    
1421

    
1422
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port):
1423
  """Computes the signed disk information for a remote import.
1424

1425
  @type cds: string
1426
  @param cds: Cluster domain secret
1427
  @type salt: string
1428
  @param salt: HMAC salt
1429
  @type disk_index: number
1430
  @param disk_index: Index of disk (included in hash)
1431
  @type host: string
1432
  @param host: Hostname
1433
  @type port: number
1434
  @param port: Daemon port
1435

1436
  """
1437
  msg = _GetRieDiskInfoMessage(disk_index, host, port)
1438
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1439
  return (host, port, hmac_digest, salt)