Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ a5310c2a

History | View | Annotate | Download (39.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Instance-related functions and classes for masterd.
23

24
"""
25

    
26
import logging
27
import time
28
import OpenSSL
29

    
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import compat
33
from ganeti import utils
34
from ganeti import objects
35

    
36

    
37
class _ImportExportError(Exception):
38
  """Local exception to report import/export errors.
39

40
  """
41

    
42

    
43
class ImportExportTimeouts(object):
44
  #: Time until daemon starts writing status file
45
  DEFAULT_READY_TIMEOUT = 10
46

    
47
  #: Length of time until errors cause hard failure
48
  DEFAULT_ERROR_TIMEOUT = 10
49

    
50
  #: Time after which daemon must be listening
51
  DEFAULT_LISTEN_TIMEOUT = 10
52

    
53
  __slots__ = [
54
    "error",
55
    "ready",
56
    "listen",
57
    "connect",
58
    ]
59

    
60
  def __init__(self, connect,
61
               listen=DEFAULT_LISTEN_TIMEOUT,
62
               error=DEFAULT_ERROR_TIMEOUT,
63
               ready=DEFAULT_READY_TIMEOUT):
64
    """Initializes this class.
65

66
    @type connect: number
67
    @param connect: Timeout for establishing connection
68
    @type listen: number
69
    @param listen: Timeout for starting to listen for connections
70
    @type error: number
71
    @param error: Length of time until errors cause hard failure
72
    @type ready: number
73
    @param ready: Timeout for daemon to become ready
74

75
    """
76
    self.error = error
77
    self.ready = ready
78
    self.listen = listen
79
    self.connect = connect
80

    
81

    
82
class ImportExportCbBase(object):
83
  """Callbacks for disk import/export.
84

85
  """
86
  def ReportListening(self, ie, private):
87
    """Called when daemon started listening.
88

89
    @type ie: Subclass of L{_DiskImportExportBase}
90
    @param ie: Import/export object
91
    @param private: Private data passed to import/export object
92

93
    """
94

    
95
  def ReportConnected(self, ie, private):
96
    """Called when a connection has been established.
97

98
    @type ie: Subclass of L{_DiskImportExportBase}
99
    @param ie: Import/export object
100
    @param private: Private data passed to import/export object
101

102
    """
103

    
104
  def ReportFinished(self, ie, private):
105
    """Called when a transfer has finished.
106

107
    @type ie: Subclass of L{_DiskImportExportBase}
108
    @param ie: Import/export object
109
    @param private: Private data passed to import/export object
110

111
    """
112

    
113

    
114
def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
115
  """Checks whether a timeout has expired.
116

117
  """
118
  return _time_fn() > (epoch + timeout)
119

    
120

    
121
class _DiskImportExportBase(object):
122
  MODE_TEXT = None
123

    
124
  def __init__(self, lu, node_name, opts,
125
               instance, timeouts, cbs, private=None):
126
    """Initializes this class.
127

128
    @param lu: Logical unit instance
129
    @type node_name: string
130
    @param node_name: Node name for import
131
    @type opts: L{objects.ImportExportOptions}
132
    @param opts: Import/export daemon options
133
    @type instance: L{objects.Instance}
134
    @param instance: Instance object
135
    @type timeouts: L{ImportExportTimeouts}
136
    @param timeouts: Timeouts for this import
137
    @type cbs: L{ImportExportCbBase}
138
    @param cbs: Callbacks
139
    @param private: Private data for callback functions
140

141
    """
142
    assert self.MODE_TEXT
143

    
144
    self._lu = lu
145
    self.node_name = node_name
146
    self._opts = opts
147
    self._instance = instance
148
    self._timeouts = timeouts
149
    self._cbs = cbs
150
    self._private = private
151

    
152
    # Parent loop
153
    self._loop = None
154

    
155
    # Timestamps
156
    self._ts_begin = None
157
    self._ts_connected = None
158
    self._ts_finished = None
159
    self._ts_cleanup = None
160
    self._ts_last_error = None
161

    
162
    # Transfer status
163
    self.success = None
164
    self.final_message = None
165

    
166
    # Daemon status
167
    self._daemon_name = None
168
    self._daemon = None
169

    
170
  @property
171
  def recent_output(self):
172
    """Returns the most recent output from the daemon.
173

174
    """
175
    if self._daemon:
176
      return self._daemon.recent_output
177

    
178
    return None
179

    
180
  @property
181
  def active(self):
182
    """Determines whether this transport is still active.
183

184
    """
185
    return self.success is None
186

    
187
  @property
188
  def loop(self):
189
    """Returns parent loop.
190

191
    @rtype: L{ImportExportLoop}
192

193
    """
194
    return self._loop
195

    
196
  def SetLoop(self, loop):
197
    """Sets the parent loop.
198

199
    @type loop: L{ImportExportLoop}
200

201
    """
202
    if self._loop:
203
      raise errors.ProgrammerError("Loop can only be set once")
204

    
205
    self._loop = loop
206

    
207
  def _StartDaemon(self):
208
    """Starts the import/export daemon.
209

210
    """
211
    raise NotImplementedError()
212

    
213
  def CheckDaemon(self):
214
    """Checks whether daemon has been started and if not, starts it.
215

216
    @rtype: string
217
    @return: Daemon name
218

219
    """
220
    assert self._ts_cleanup is None
221

    
222
    if self._daemon_name is None:
223
      assert self._ts_begin is None
224

    
225
      result = self._StartDaemon()
226
      if result.fail_msg:
227
        raise _ImportExportError("Failed to start %s on %s: %s" %
228
                                 (self.MODE_TEXT, self.node_name,
229
                                  result.fail_msg))
230

    
231
      daemon_name = result.payload
232

    
233
      logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name,
234
                   self.node_name)
235

    
236
      self._ts_begin = time.time()
237
      self._daemon_name = daemon_name
238

    
239
    return self._daemon_name
240

    
241
  def GetDaemonName(self):
242
    """Returns the daemon name.
243

244
    """
245
    assert self._daemon_name, "Daemon has not been started"
246
    assert self._ts_cleanup is None
247
    return self._daemon_name
248

    
249
  def Abort(self):
250
    """Sends SIGTERM to import/export daemon (if still active).
251

252
    """
253
    if self._daemon_name:
254
      self._lu.LogWarning("Aborting %s %r on %s",
255
                          self.MODE_TEXT, self._daemon_name, self.node_name)
256
      result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
257
      if result.fail_msg:
258
        self._lu.LogWarning("Failed to abort %s %r on %s: %s",
259
                            self.MODE_TEXT, self._daemon_name,
260
                            self.node_name, result.fail_msg)
261
        return False
262

    
263
    return True
264

    
265
  def _SetDaemonData(self, data):
266
    """Internal function for updating status daemon data.
267

268
    @type data: L{objects.ImportExportStatus}
269
    @param data: Daemon status data
270

271
    """
272
    assert self._ts_begin is not None
273

    
274
    if not data:
275
      if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
276
        raise _ImportExportError("Didn't become ready after %s seconds" %
277
                                 self._timeouts.ready)
278

    
279
      return False
280

    
281
    self._daemon = data
282

    
283
    return True
284

    
285
  def SetDaemonData(self, success, data):
286
    """Updates daemon status data.
287

288
    @type success: bool
289
    @param success: Whether fetching data was successful or not
290
    @type data: L{objects.ImportExportStatus}
291
    @param data: Daemon status data
292

293
    """
294
    if not success:
295
      if self._ts_last_error is None:
296
        self._ts_last_error = time.time()
297

    
298
      elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
299
        raise _ImportExportError("Too many errors while updating data")
300

    
301
      return False
302

    
303
    self._ts_last_error = None
304

    
305
    return self._SetDaemonData(data)
306

    
307
  def CheckListening(self):
308
    """Checks whether the daemon is listening.
309

310
    """
311
    raise NotImplementedError()
312

    
313
  def _GetConnectedCheckEpoch(self):
314
    """Returns timeout to calculate connect timeout.
315

316
    """
317
    raise NotImplementedError()
318

    
319
  def CheckConnected(self):
320
    """Checks whether the daemon is connected.
321

322
    @rtype: bool
323
    @return: Whether the daemon is connected
324

325
    """
326
    assert self._daemon, "Daemon status missing"
327

    
328
    if self._ts_connected is not None:
329
      return True
330

    
331
    if self._daemon.connected:
332
      self._ts_connected = time.time()
333

    
334
      # TODO: Log remote peer
335
      logging.debug("%s %r on %s is now connected",
336
                    self.MODE_TEXT, self._daemon_name, self.node_name)
337

    
338
      self._cbs.ReportConnected(self, self._private)
339

    
340
      return True
341

    
342
    if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
343
      raise _ImportExportError("Not connected after %s seconds" %
344
                               self._timeouts.connect)
345

    
346
    return False
347

    
348
  def CheckFinished(self):
349
    """Checks whether the daemon exited.
350

351
    @rtype: bool
352
    @return: Whether the transfer is finished
353

354
    """
355
    assert self._daemon, "Daemon status missing"
356

    
357
    if self._ts_finished:
358
      return True
359

    
360
    if self._daemon.exit_status is None:
361
      return False
362

    
363
    self._ts_finished = time.time()
364

    
365
    self._ReportFinished(self._daemon.exit_status == 0,
366
                         self._daemon.error_message)
367

    
368
    return True
369

    
370
  def _ReportFinished(self, success, message):
371
    """Transfer is finished or daemon exited.
372

373
    @type success: bool
374
    @param success: Whether the transfer was successful
375
    @type message: string
376
    @param message: Error message
377

378
    """
379
    assert self.success is None
380

    
381
    self.success = success
382
    self.final_message = message
383

    
384
    if success:
385
      logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name,
386
                   self.node_name)
387
    elif self._daemon_name:
388
      self._lu.LogWarning("%s %r on %s failed: %s",
389
                          self.MODE_TEXT, self._daemon_name, self.node_name,
390
                          message)
391
    else:
392
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
393
                          self.node_name, message)
394

    
395
    self._cbs.ReportFinished(self, self._private)
396

    
397
  def _Finalize(self):
398
    """Makes the RPC call to finalize this import/export.
399

400
    """
401
    return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
402

    
403
  def Finalize(self, error=None):
404
    """Finalizes this import/export.
405

406
    """
407
    assert error or self.success is not None
408

    
409
    if self._daemon_name:
410
      logging.info("Finalizing %s %r on %s",
411
                   self.MODE_TEXT, self._daemon_name, self.node_name)
412

    
413
      result = self._Finalize()
414
      if result.fail_msg:
415
        self._lu.LogWarning("Failed to finalize %s %r on %s: %s",
416
                            self.MODE_TEXT, self._daemon_name,
417
                            self.node_name, result.fail_msg)
418
        return False
419

    
420
      # Daemon is no longer running
421
      self._daemon_name = None
422
      self._ts_cleanup = time.time()
423

    
424
    if error:
425
      self._ReportFinished(False, error)
426

    
427
    return True
428

    
429

    
430
class DiskImport(_DiskImportExportBase):
431
  MODE_TEXT = "import"
432

    
433
  def __init__(self, lu, node_name, opts, instance,
434
               dest, dest_args, timeouts, cbs, private=None):
435
    """Initializes this class.
436

437
    @param lu: Logical unit instance
438
    @type node_name: string
439
    @param node_name: Node name for import
440
    @type opts: L{objects.ImportExportOptions}
441
    @param opts: Import/export daemon options
442
    @type instance: L{objects.Instance}
443
    @param instance: Instance object
444
    @param dest: I/O destination
445
    @param dest_args: I/O arguments
446
    @type timeouts: L{ImportExportTimeouts}
447
    @param timeouts: Timeouts for this import
448
    @type cbs: L{ImportExportCbBase}
449
    @param cbs: Callbacks
450
    @param private: Private data for callback functions
451

452
    """
453
    _DiskImportExportBase.__init__(self, lu, node_name, opts,
454
                                   instance, timeouts, cbs, private)
455
    self._dest = dest
456
    self._dest_args = dest_args
457

    
458
    # Timestamps
459
    self._ts_listening = None
460

    
461
  @property
462
  def listen_port(self):
463
    """Returns the port the daemon is listening on.
464

465
    """
466
    if self._daemon:
467
      return self._daemon.listen_port
468

    
469
    return None
470

    
471
  def _StartDaemon(self):
472
    """Starts the import daemon.
473

474
    """
475
    return self._lu.rpc.call_import_start(self.node_name, self._opts,
476
                                          self._instance,
477
                                          self._dest, self._dest_args)
478

    
479
  def CheckListening(self):
480
    """Checks whether the daemon is listening.
481

482
    @rtype: bool
483
    @return: Whether the daemon is listening
484

485
    """
486
    assert self._daemon, "Daemon status missing"
487

    
488
    if self._ts_listening is not None:
489
      return True
490

    
491
    port = self._daemon.listen_port
492
    if port is not None:
493
      self._ts_listening = time.time()
494

    
495
      logging.debug("Import %r on %s is now listening on port %s",
496
                    self._daemon_name, self.node_name, port)
497

    
498
      self._cbs.ReportListening(self, self._private)
499

    
500
      return True
501

    
502
    if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
503
      raise _ImportExportError("Not listening after %s seconds" %
504
                               self._timeouts.listen)
505

    
506
    return False
507

    
508
  def _GetConnectedCheckEpoch(self):
509
    """Returns the time since we started listening.
510

511
    """
512
    assert self._ts_listening is not None, \
513
           ("Checking whether an import is connected is only useful"
514
            " once it's been listening")
515

    
516
    return self._ts_listening
517

    
518

    
519
class DiskExport(_DiskImportExportBase):
520
  MODE_TEXT = "export"
521

    
522
  def __init__(self, lu, node_name, opts,
523
               dest_host, dest_port, instance, source, source_args,
524
               timeouts, cbs, private=None):
525
    """Initializes this class.
526

527
    @param lu: Logical unit instance
528
    @type node_name: string
529
    @param node_name: Node name for import
530
    @type opts: L{objects.ImportExportOptions}
531
    @param opts: Import/export daemon options
532
    @type dest_host: string
533
    @param dest_host: Destination host name or IP address
534
    @type dest_port: number
535
    @param dest_port: Destination port number
536
    @type instance: L{objects.Instance}
537
    @param instance: Instance object
538
    @param source: I/O source
539
    @param source_args: I/O source
540
    @type timeouts: L{ImportExportTimeouts}
541
    @param timeouts: Timeouts for this import
542
    @type cbs: L{ImportExportCbBase}
543
    @param cbs: Callbacks
544
    @param private: Private data for callback functions
545

546
    """
547
    _DiskImportExportBase.__init__(self, lu, node_name, opts,
548
                                   instance, timeouts, cbs, private)
549
    self._dest_host = dest_host
550
    self._dest_port = dest_port
551
    self._source = source
552
    self._source_args = source_args
553

    
554
  def _StartDaemon(self):
555
    """Starts the export daemon.
556

557
    """
558
    return self._lu.rpc.call_export_start(self.node_name, self._opts,
559
                                          self._dest_host, self._dest_port,
560
                                          self._instance, self._source,
561
                                          self._source_args)
562

    
563
  def CheckListening(self):
564
    """Checks whether the daemon is listening.
565

566
    """
567
    # Only an import can be listening
568
    return True
569

    
570
  def _GetConnectedCheckEpoch(self):
571
    """Returns the time since the daemon started.
572

573
    """
574
    assert self._ts_begin is not None
575

    
576
    return self._ts_begin
577

    
578

    
579
class ImportExportLoop:
580
  MIN_DELAY = 1.0
581
  MAX_DELAY = 20.0
582

    
583
  def __init__(self, lu):
584
    """Initializes this class.
585

586
    """
587
    self._lu = lu
588
    self._queue = []
589
    self._pending_add = []
590

    
591
  def Add(self, diskie):
592
    """Adds an import/export object to the loop.
593

594
    @type diskie: Subclass of L{_DiskImportExportBase}
595
    @param diskie: Import/export object
596

597
    """
598
    assert diskie not in self._pending_add
599
    assert diskie.loop is None
600

    
601
    diskie.SetLoop(self)
602

    
603
    # Adding new objects to a staging list is necessary, otherwise the main
604
    # loop gets confused if callbacks modify the queue while the main loop is
605
    # iterating over it.
606
    self._pending_add.append(diskie)
607

    
608
  @staticmethod
609
  def _CollectDaemonStatus(lu, daemons):
610
    """Collects the status for all import/export daemons.
611

612
    """
613
    daemon_status = {}
614

    
615
    for node_name, names in daemons.iteritems():
616
      result = lu.rpc.call_impexp_status(node_name, names)
617
      if result.fail_msg:
618
        lu.LogWarning("Failed to get daemon status on %s: %s",
619
                      node_name, result.fail_msg)
620
        continue
621

    
622
      assert len(names) == len(result.payload)
623

    
624
      daemon_status[node_name] = dict(zip(names, result.payload))
625

    
626
    return daemon_status
627

    
628
  @staticmethod
629
  def _GetActiveDaemonNames(queue):
630
    """Gets the names of all active daemons.
631

632
    """
633
    result = {}
634
    for diskie in queue:
635
      if not diskie.active:
636
        continue
637

    
638
      try:
639
        # Start daemon if necessary
640
        daemon_name = diskie.CheckDaemon()
641
      except _ImportExportError, err:
642
        logging.exception("%s failed", diskie.MODE_TEXT)
643
        diskie.Finalize(error=str(err))
644
        continue
645

    
646
      result.setdefault(diskie.node_name, []).append(daemon_name)
647

    
648
    assert len(queue) >= len(result)
649
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
650

    
651
    logging.debug("daemons=%r", result)
652

    
653
    return result
654

    
655
  def _AddPendingToQueue(self):
656
    """Adds all pending import/export objects to the internal queue.
657

658
    """
659
    assert compat.all(diskie not in self._queue and diskie.loop == self
660
                      for diskie in self._pending_add)
661

    
662
    self._queue.extend(self._pending_add)
663

    
664
    del self._pending_add[:]
665

    
666
  def Run(self):
667
    """Utility main loop.
668

669
    """
670
    while True:
671
      self._AddPendingToQueue()
672

    
673
      # Collect all active daemon names
674
      daemons = self._GetActiveDaemonNames(self._queue)
675
      if not daemons:
676
        break
677

    
678
      # Collection daemon status data
679
      data = self._CollectDaemonStatus(self._lu, daemons)
680

    
681
      # Use data
682
      delay = self.MAX_DELAY
683
      for diskie in self._queue:
684
        if not diskie.active:
685
          continue
686

    
687
        try:
688
          try:
689
            all_daemon_data = data[diskie.node_name]
690
          except KeyError:
691
            result = diskie.SetDaemonData(False, None)
692
          else:
693
            result = \
694
              diskie.SetDaemonData(True,
695
                                   all_daemon_data[diskie.GetDaemonName()])
696

    
697
          if not result:
698
            # Daemon not yet ready, retry soon
699
            delay = min(3.0, delay)
700
            continue
701

    
702
          if diskie.CheckFinished():
703
            # Transfer finished
704
            diskie.Finalize()
705
            continue
706

    
707
          # Normal case: check again in 5 seconds
708
          delay = min(5.0, delay)
709

    
710
          if not diskie.CheckListening():
711
            # Not yet listening, retry soon
712
            delay = min(1.0, delay)
713
            continue
714

    
715
          if not diskie.CheckConnected():
716
            # Not yet connected, retry soon
717
            delay = min(1.0, delay)
718
            continue
719

    
720
        except _ImportExportError, err:
721
          logging.exception("%s failed", diskie.MODE_TEXT)
722
          diskie.Finalize(error=str(err))
723

    
724
      if not compat.any([diskie.active for diskie in self._queue]):
725
        break
726

    
727
      # Wait a bit
728
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
729
      logging.debug("Waiting for %ss", delay)
730
      time.sleep(delay)
731

    
732
  def FinalizeAll(self):
733
    """Finalizes all pending transfers.
734

735
    """
736
    success = True
737

    
738
    for diskie in self._queue:
739
      success = diskie.Finalize() and success
740

    
741
    return success
742

    
743

    
744
class _TransferInstCbBase(ImportExportCbBase):
745
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
746
               dest_node, dest_ip, export_opts):
747
    """Initializes this class.
748

749
    """
750
    ImportExportCbBase.__init__(self)
751

    
752
    self.lu = lu
753
    self.feedback_fn = feedback_fn
754
    self.instance = instance
755
    self.timeouts = timeouts
756
    self.src_node = src_node
757
    self.src_cbs = src_cbs
758
    self.dest_node = dest_node
759
    self.dest_ip = dest_ip
760
    self.export_opts = export_opts
761

    
762

    
763
class _TransferInstSourceCb(_TransferInstCbBase):
764
  def ReportConnected(self, ie, dtp):
765
    """Called when a connection has been established.
766

767
    """
768
    assert self.src_cbs is None
769
    assert dtp.src_export == ie
770
    assert dtp.dest_import
771

    
772
    self.feedback_fn("%s is sending data on %s" %
773
                     (dtp.data.name, ie.node_name))
774

    
775
  def ReportFinished(self, ie, dtp):
776
    """Called when a transfer has finished.
777

778
    """
779
    assert self.src_cbs is None
780
    assert dtp.src_export == ie
781
    assert dtp.dest_import
782

    
783
    if ie.success:
784
      self.feedback_fn("%s finished sending data" % dtp.data.name)
785
    else:
786
      self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
787
                       (dtp.data.name, ie.final_message, ie.recent_output))
788

    
789
    dtp.RecordResult(ie.success)
790

    
791
    cb = dtp.data.finished_fn
792
    if cb:
793
      cb()
794

    
795
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
796
    # give the daemon a moment to sort things out
797
    if dtp.dest_import and not ie.success:
798
      dtp.dest_import.Abort()
799

    
800

    
801
class _TransferInstDestCb(_TransferInstCbBase):
802
  def ReportListening(self, ie, dtp):
803
    """Called when daemon started listening.
804

805
    """
806
    assert self.src_cbs
807
    assert dtp.src_export is None
808
    assert dtp.dest_import
809

    
810
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
811

    
812
    # Start export on source node
813
    de = DiskExport(self.lu, self.src_node, self.export_opts,
814
                    self.dest_ip, ie.listen_port,
815
                    self.instance, dtp.data.src_io, dtp.data.src_ioargs,
816
                    self.timeouts, self.src_cbs, private=dtp)
817
    ie.loop.Add(de)
818

    
819
    dtp.src_export = de
820

    
821
  def ReportConnected(self, ie, dtp):
822
    """Called when a connection has been established.
823

824
    """
825
    self.feedback_fn("%s is receiving data on %s" %
826
                     (dtp.data.name, self.dest_node))
827

    
828
  def ReportFinished(self, ie, dtp):
829
    """Called when a transfer has finished.
830

831
    """
832
    if ie.success:
833
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
834
    else:
835
      self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
836
                       (dtp.data.name, ie.final_message, ie.recent_output))
837

    
838
    dtp.RecordResult(ie.success)
839

    
840
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
841
    # give the daemon a moment to sort things out
842
    if dtp.src_export and not ie.success:
843
      dtp.src_export.Abort()
844

    
845

    
846
class DiskTransfer(object):
847
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
848
               finished_fn):
849
    """Initializes this class.
850

851
    @type name: string
852
    @param name: User-visible name for this transfer (e.g. "disk/0")
853
    @param src_io: Source I/O type
854
    @param src_ioargs: Source I/O arguments
855
    @param dest_io: Destination I/O type
856
    @param dest_ioargs: Destination I/O arguments
857
    @type finished_fn: callable
858
    @param finished_fn: Function called once transfer has finished
859

860
    """
861
    self.name = name
862

    
863
    self.src_io = src_io
864
    self.src_ioargs = src_ioargs
865

    
866
    self.dest_io = dest_io
867
    self.dest_ioargs = dest_ioargs
868

    
869
    self.finished_fn = finished_fn
870

    
871

    
872
class _DiskTransferPrivate(object):
873
  def __init__(self, data, success):
874
    """Initializes this class.
875

876
    @type data: L{DiskTransfer}
877
    @type success: bool
878

879
    """
880
    self.data = data
881

    
882
    self.src_export = None
883
    self.dest_import = None
884

    
885
    self.success = success
886

    
887
  def RecordResult(self, success):
888
    """Updates the status.
889

890
    One failed part will cause the whole transfer to fail.
891

892
    """
893
    self.success = self.success and success
894

    
895

    
896
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
897
                         instance, all_transfers):
898
  """Transfers an instance's data from one node to another.
899

900
  @param lu: Logical unit instance
901
  @param feedback_fn: Feedback function
902
  @type src_node: string
903
  @param src_node: Source node name
904
  @type dest_node: string
905
  @param dest_node: Destination node name
906
  @type dest_ip: string
907
  @param dest_ip: IP address of destination node
908
  @type instance: L{objects.Instance}
909
  @param instance: Instance object
910
  @type all_transfers: list of L{DiskTransfer} instances
911
  @param all_transfers: List of all disk transfers to be made
912
  @rtype: list
913
  @return: List with a boolean (True=successful, False=failed) for success for
914
           each transfer
915

916
  """
917
  # Compress only if transfer is to another node
918
  if src_node == dest_node:
919
    compress = constants.IEC_NONE
920
  else:
921
    compress = constants.IEC_GZIP
922

    
923
  logging.debug("Source node %s, destination node %s, compression '%s'",
924
                src_node, dest_node, compress)
925

    
926
  opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
927
                                     compress=compress)
928

    
929
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
930
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
931
                                  src_node, None, dest_node, dest_ip, opts)
932
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
933
                                 src_node, src_cbs, dest_node, dest_ip, opts)
934

    
935
  all_dtp = []
936

    
937
  ieloop = ImportExportLoop(lu)
938
  try:
939
    for transfer in all_transfers:
940
      if transfer:
941
        feedback_fn("Exporting %s from %s to %s" %
942
                    (transfer.name, src_node, dest_node))
943

    
944
        dtp = _DiskTransferPrivate(transfer, True)
945

    
946
        di = DiskImport(lu, dest_node, opts, instance,
947
                        transfer.dest_io, transfer.dest_ioargs,
948
                        timeouts, dest_cbs, private=dtp)
949
        ieloop.Add(di)
950

    
951
        dtp.dest_import = di
952
      else:
953
        dtp = _DiskTransferPrivate(None, False)
954

    
955
      all_dtp.append(dtp)
956

    
957
    ieloop.Run()
958
  finally:
959
    ieloop.FinalizeAll()
960

    
961
  assert len(all_dtp) == len(all_transfers)
962
  assert compat.all([(dtp.src_export is None or
963
                      dtp.src_export.success is not None) and
964
                     (dtp.dest_import is None or
965
                      dtp.dest_import.success is not None)
966
                     for dtp in all_dtp]), \
967
         "Not all imports/exports are finalized"
968

    
969
  return [bool(dtp.success) for dtp in all_dtp]
970

    
971

    
972
class _RemoteExportCb(ImportExportCbBase):
973
  def __init__(self, feedback_fn, disk_count):
974
    """Initializes this class.
975

976
    """
977
    ImportExportCbBase.__init__(self)
978
    self._feedback_fn = feedback_fn
979
    self._dresults = [None] * disk_count
980

    
981
  @property
982
  def disk_results(self):
983
    """Returns per-disk results.
984

985
    """
986
    return self._dresults
987

    
988
  def ReportConnected(self, ie, private):
989
    """Called when a connection has been established.
990

991
    """
992
    (idx, _) = private
993

    
994
    self._feedback_fn("Disk %s is now sending data" % idx)
995

    
996
  def ReportFinished(self, ie, private):
997
    """Called when a transfer has finished.
998

999
    """
1000
    (idx, finished_fn) = private
1001

    
1002
    if ie.success:
1003
      self._feedback_fn("Disk %s finished sending data" % idx)
1004
    else:
1005
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" %
1006
                        (idx, ie.final_message, ie.recent_output))
1007

    
1008
    self._dresults[idx] = bool(ie.success)
1009

    
1010
    if finished_fn:
1011
      finished_fn()
1012

    
1013

    
1014
class ExportInstanceHelper:
1015
  def __init__(self, lu, feedback_fn, instance):
1016
    """Initializes this class.
1017

1018
    @param lu: Logical unit instance
1019
    @param feedback_fn: Feedback function
1020
    @type instance: L{objects.Instance}
1021
    @param instance: Instance object
1022

1023
    """
1024
    self._lu = lu
1025
    self._feedback_fn = feedback_fn
1026
    self._instance = instance
1027

    
1028
    self._snap_disks = []
1029
    self._removed_snaps = [False] * len(instance.disks)
1030

    
1031
  def CreateSnapshots(self):
1032
    """Creates an LVM snapshot for every disk of the instance.
1033

1034
    """
1035
    assert not self._snap_disks
1036

    
1037
    instance = self._instance
1038
    src_node = instance.primary_node
1039

    
1040
    vgname = self._lu.cfg.GetVGName()
1041

    
1042
    for idx, disk in enumerate(instance.disks):
1043
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1044
                        (idx, src_node))
1045

    
1046
      # result.payload will be a snapshot of an lvm leaf of the one we
1047
      # passed
1048
      result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1049
      msg = result.fail_msg
1050
      if msg:
1051
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1052
                            idx, src_node, msg)
1053
        new_dev = False
1054
      else:
1055
        disk_id = (vgname, result.payload)
1056
        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1057
                               logical_id=disk_id, physical_id=disk_id,
1058
                               iv_name=disk.iv_name)
1059

    
1060
      self._snap_disks.append(new_dev)
1061

    
1062
    assert len(self._snap_disks) == len(instance.disks)
1063
    assert len(self._removed_snaps) == len(instance.disks)
1064

    
1065
  def _RemoveSnapshot(self, disk_index):
1066
    """Removes an LVM snapshot.
1067

1068
    @type disk_index: number
1069
    @param disk_index: Index of the snapshot to be removed
1070

1071
    """
1072
    disk = self._snap_disks[disk_index]
1073
    if disk and not self._removed_snaps[disk_index]:
1074
      src_node = self._instance.primary_node
1075

    
1076
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1077
                        (disk_index, src_node))
1078

    
1079
      result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1080
      if result.fail_msg:
1081
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1082
                            " %s: %s", disk_index, src_node, result.fail_msg)
1083
      else:
1084
        self._removed_snaps[disk_index] = True
1085

    
1086
  def LocalExport(self, dest_node):
1087
    """Intra-cluster instance export.
1088

1089
    @type dest_node: L{objects.Node}
1090
    @param dest_node: Destination node
1091

1092
    """
1093
    instance = self._instance
1094
    src_node = instance.primary_node
1095

    
1096
    assert len(self._snap_disks) == len(instance.disks)
1097

    
1098
    transfers = []
1099

    
1100
    for idx, dev in enumerate(self._snap_disks):
1101
      if not dev:
1102
        transfers.append(None)
1103
        continue
1104

    
1105
      path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1106
                            dev.physical_id[1])
1107

    
1108
      finished_fn = compat.partial(self._TransferFinished, idx)
1109

    
1110
      # FIXME: pass debug option from opcode to backend
1111
      dt = DiskTransfer("snapshot/%s" % idx,
1112
                        constants.IEIO_SCRIPT, (dev, idx),
1113
                        constants.IEIO_FILE, (path, ),
1114
                        finished_fn)
1115
      transfers.append(dt)
1116

    
1117
    # Actually export data
1118
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1119
                                    src_node, dest_node.name,
1120
                                    dest_node.secondary_ip,
1121
                                    instance, transfers)
1122

    
1123
    assert len(dresults) == len(instance.disks)
1124

    
1125
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1126
    result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1127
                                               self._snap_disks)
1128
    msg = result.fail_msg
1129
    fin_resu = not msg
1130
    if msg:
1131
      self._lu.LogWarning("Could not finalize export for instance %s"
1132
                          " on node %s: %s", instance.name, dest_node.name, msg)
1133

    
1134
    return (fin_resu, dresults)
1135

    
1136
  def RemoteExport(self, opts, disk_info, timeouts):
1137
    """Inter-cluster instance export.
1138

1139
    @type opts: L{objects.ImportExportOptions}
1140
    @param opts: Import/export daemon options
1141
    @type disk_info: list
1142
    @param disk_info: Per-disk destination information
1143
    @type timeouts: L{ImportExportTimeouts}
1144
    @param timeouts: Timeouts for this import
1145

1146
    """
1147
    instance = self._instance
1148

    
1149
    assert len(disk_info) == len(instance.disks)
1150

    
1151
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1152

    
1153
    ieloop = ImportExportLoop(self._lu)
1154
    try:
1155
      for idx, (dev, (host, port, _, _)) in enumerate(zip(instance.disks,
1156
                                                          disk_info)):
1157
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1158
        finished_fn = compat.partial(self._TransferFinished, idx)
1159
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1160
                              opts, host, port, instance,
1161
                              constants.IEIO_SCRIPT, (dev, idx),
1162
                              timeouts, cbs, private=(idx, finished_fn)))
1163

    
1164
      ieloop.Run()
1165
    finally:
1166
      ieloop.FinalizeAll()
1167

    
1168
    return (True, cbs.disk_results)
1169

    
1170
  def _TransferFinished(self, idx):
1171
    """Called once a transfer has finished.
1172

1173
    @type idx: number
1174
    @param idx: Disk index
1175

1176
    """
1177
    logging.debug("Transfer %s finished", idx)
1178
    self._RemoveSnapshot(idx)
1179

    
1180
  def Cleanup(self):
1181
    """Remove all snapshots.
1182

1183
    """
1184
    assert len(self._removed_snaps) == len(self._instance.disks)
1185
    for idx in range(len(self._instance.disks)):
1186
      self._RemoveSnapshot(idx)
1187

    
1188

    
1189
class _RemoteImportCb(ImportExportCbBase):
1190
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1191
               external_address):
1192
    """Initializes this class.
1193

1194
    @type cds: string
1195
    @param cds: Cluster domain secret
1196
    @type x509_cert_pem: string
1197
    @param x509_cert_pem: CA used for signing import key
1198
    @type disk_count: number
1199
    @param disk_count: Number of disks
1200
    @type external_address: string
1201
    @param external_address: External address of destination node
1202

1203
    """
1204
    ImportExportCbBase.__init__(self)
1205
    self._feedback_fn = feedback_fn
1206
    self._cds = cds
1207
    self._x509_cert_pem = x509_cert_pem
1208
    self._disk_count = disk_count
1209
    self._external_address = external_address
1210

    
1211
    self._dresults = [None] * disk_count
1212
    self._daemon_port = [None] * disk_count
1213

    
1214
    self._salt = utils.GenerateSecret(8)
1215

    
1216
  @property
1217
  def disk_results(self):
1218
    """Returns per-disk results.
1219

1220
    """
1221
    return self._dresults
1222

    
1223
  def _CheckAllListening(self):
1224
    """Checks whether all daemons are listening.
1225

1226
    If all daemons are listening, the information is sent to the client.
1227

1228
    """
1229
    if not compat.all(dp is not None for dp in self._daemon_port):
1230
      return
1231

    
1232
    host = self._external_address
1233

    
1234
    disks = []
1235
    for idx, port in enumerate(self._daemon_port):
1236
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1237
                                               idx, host, port))
1238

    
1239
    assert len(disks) == self._disk_count
1240

    
1241
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1242
      "disks": disks,
1243
      "x509_ca": self._x509_cert_pem,
1244
      })
1245

    
1246
  def ReportListening(self, ie, private):
1247
    """Called when daemon started listening.
1248

1249
    """
1250
    (idx, ) = private
1251

    
1252
    self._feedback_fn("Disk %s is now listening" % idx)
1253

    
1254
    assert self._daemon_port[idx] is None
1255

    
1256
    self._daemon_port[idx] = ie.listen_port
1257

    
1258
    self._CheckAllListening()
1259

    
1260
  def ReportConnected(self, ie, private):
1261
    """Called when a connection has been established.
1262

1263
    """
1264
    (idx, ) = private
1265

    
1266
    self._feedback_fn("Disk %s is now receiving data" % idx)
1267

    
1268
  def ReportFinished(self, ie, private):
1269
    """Called when a transfer has finished.
1270

1271
    """
1272
    (idx, ) = private
1273

    
1274
    # Daemon is certainly no longer listening
1275
    self._daemon_port[idx] = None
1276

    
1277
    if ie.success:
1278
      self._feedback_fn("Disk %s finished receiving data" % idx)
1279
    else:
1280
      self._feedback_fn(("Disk %s failed to receive data: %s"
1281
                         " (recent output: %r)") %
1282
                        (idx, ie.final_message, ie.recent_output))
1283

    
1284
    self._dresults[idx] = bool(ie.success)
1285

    
1286

    
1287
def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts):
1288
  """Imports an instance from another cluster.
1289

1290
  @param lu: Logical unit instance
1291
  @param feedback_fn: Feedback function
1292
  @type instance: L{objects.Instance}
1293
  @param instance: Instance object
1294
  @type source_x509_ca: OpenSSL.crypto.X509
1295
  @param source_x509_ca: Import source's X509 CA
1296
  @type cds: string
1297
  @param cds: Cluster domain secret
1298
  @type timeouts: L{ImportExportTimeouts}
1299
  @param timeouts: Timeouts for this import
1300

1301
  """
1302
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1303
                                                  source_x509_ca)
1304

    
1305
  # Create crypto key
1306
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1307
                                        constants.RIE_CERT_VALIDITY)
1308
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1309

    
1310
  (x509_key_name, x509_cert_pem) = result.payload
1311
  try:
1312
    # Load certificate
1313
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1314
                                                x509_cert_pem)
1315

    
1316
    # Import daemon options
1317
    opts = objects.ImportExportOptions(key_name=x509_key_name,
1318
                                       ca_pem=source_ca_pem)
1319

    
1320
    # Sign certificate
1321
    signed_x509_cert_pem = \
1322
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1323

    
1324
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1325
                          len(instance.disks), instance.primary_node)
1326

    
1327
    ieloop = ImportExportLoop(lu)
1328
    try:
1329
      for idx, dev in enumerate(instance.disks):
1330
        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1331
                              constants.IEIO_SCRIPT, (dev, idx),
1332
                              timeouts, cbs, private=(idx, )))
1333

    
1334
      ieloop.Run()
1335
    finally:
1336
      ieloop.FinalizeAll()
1337
  finally:
1338
    # Remove crypto key and certificate
1339
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1340
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1341

    
1342
  return cbs.disk_results
1343

    
1344

    
1345
def _GetImportExportHandshakeMessage(version):
1346
  """Returns the handshake message for a RIE protocol version.
1347

1348
  @type version: number
1349

1350
  """
1351
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1352

    
1353

    
1354
def ComputeRemoteExportHandshake(cds):
1355
  """Computes the remote import/export handshake.
1356

1357
  @type cds: string
1358
  @param cds: Cluster domain secret
1359

1360
  """
1361
  salt = utils.GenerateSecret(8)
1362
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1363
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1364

    
1365

    
1366
def CheckRemoteExportHandshake(cds, handshake):
1367
  """Checks the handshake of a remote import/export.
1368

1369
  @type cds: string
1370
  @param cds: Cluster domain secret
1371
  @type handshake: sequence
1372
  @param handshake: Handshake sent by remote peer
1373

1374
  """
1375
  try:
1376
    (version, hmac_digest, hmac_salt) = handshake
1377
  except (TypeError, ValueError), err:
1378
    return "Invalid data: %s" % err
1379

    
1380
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1381
                              hmac_digest, salt=hmac_salt):
1382
    return "Hash didn't match, clusters don't share the same domain secret"
1383

    
1384
  if version != constants.RIE_VERSION:
1385
    return ("Clusters don't have the same remote import/export protocol"
1386
            " (local=%s, remote=%s)" %
1387
            (constants.RIE_VERSION, version))
1388

    
1389
  return None
1390

    
1391

    
1392
def _GetRieDiskInfoMessage(disk_index, host, port):
1393
  """Returns the hashed text for import/export disk information.
1394

1395
  @type disk_index: number
1396
  @param disk_index: Index of disk (included in hash)
1397
  @type host: string
1398
  @param host: Hostname
1399
  @type port: number
1400
  @param port: Daemon port
1401

1402
  """
1403
  return "%s:%s:%s" % (disk_index, host, port)
1404

    
1405

    
1406
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1407
  """Verifies received disk information for an export.
1408

1409
  @type cds: string
1410
  @param cds: Cluster domain secret
1411
  @type disk_index: number
1412
  @param disk_index: Index of disk (included in hash)
1413
  @type disk_info: sequence
1414
  @param disk_info: Disk information sent by remote peer
1415

1416
  """
1417
  try:
1418
    (host, port, hmac_digest, hmac_salt) = disk_info
1419
  except (TypeError, ValueError), err:
1420
    raise errors.GenericError("Invalid data: %s" % err)
1421

    
1422
  if not (host and port):
1423
    raise errors.GenericError("Missing destination host or port")
1424

    
1425
  msg = _GetRieDiskInfoMessage(disk_index, host, port)
1426

    
1427
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1428
    raise errors.GenericError("HMAC is wrong")
1429

    
1430
  return (host, port)
1431

    
1432

    
1433
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port):
1434
  """Computes the signed disk information for a remote import.
1435

1436
  @type cds: string
1437
  @param cds: Cluster domain secret
1438
  @type salt: string
1439
  @param salt: HMAC salt
1440
  @type disk_index: number
1441
  @param disk_index: Index of disk (included in hash)
1442
  @type host: string
1443
  @param host: Hostname
1444
  @type port: number
1445
  @param port: Daemon port
1446

1447
  """
1448
  msg = _GetRieDiskInfoMessage(disk_index, host, port)
1449
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1450
  return (host, port, hmac_digest, salt)