Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ 033a1d00

History | View | Annotate | Download (19.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Instance-related functions and classes for masterd.
23

24
"""
25

    
26
import logging
27
import time
28

    
29
from ganeti import constants
30
from ganeti import errors
31
from ganeti import compat
32

    
33

    
34
class _ImportExportError(Exception):
35
  """Local exception to report import/export errors.
36

37
  """
38

    
39

    
40
class ImportExportTimeouts(object):
41
  #: Time until daemon starts writing status file
42
  DEFAULT_READY_TIMEOUT = 10
43

    
44
  #: Length of time until errors cause hard failure
45
  DEFAULT_ERROR_TIMEOUT = 10
46

    
47
  #: Time after which daemon must be listening
48
  DEFAULT_LISTEN_TIMEOUT = 10
49

    
50
  __slots__ = [
51
    "error",
52
    "ready",
53
    "listen",
54
    "connect",
55
    ]
56

    
57
  def __init__(self, connect,
58
               listen=DEFAULT_LISTEN_TIMEOUT,
59
               error=DEFAULT_ERROR_TIMEOUT,
60
               ready=DEFAULT_READY_TIMEOUT):
61
    """Initializes this class.
62

63
    @type connect: number
64
    @param connect: Timeout for establishing connection
65
    @type listen: number
66
    @param listen: Timeout for starting to listen for connections
67
    @type error: number
68
    @param error: Length of time until errors cause hard failure
69
    @type ready: number
70
    @param ready: Timeout for daemon to become ready
71

72
    """
73
    self.error = error
74
    self.ready = ready
75
    self.listen = listen
76
    self.connect = connect
77

    
78

    
79
class ImportExportCbBase(object):
80
  """Callbacks for disk import/export.
81

82
  """
83
  def ReportListening(self, ie, private):
84
    """Called when daemon started listening.
85

86
    @type ie: Subclass of L{_DiskImportExportBase}
87
    @param ie: Import/export object
88
    @param private: Private data passed to import/export object
89

90
    """
91

    
92
  def ReportConnected(self, ie, private):
93
    """Called when a connection has been established.
94

95
    @type ie: Subclass of L{_DiskImportExportBase}
96
    @param ie: Import/export object
97
    @param private: Private data passed to import/export object
98

99
    """
100

    
101
  def ReportFinished(self, ie, private):
102
    """Called when a transfer has finished.
103

104
    @type ie: Subclass of L{_DiskImportExportBase}
105
    @param ie: Import/export object
106
    @param private: Private data passed to import/export object
107

108
    """
109

    
110

    
111
def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
112
  """Checks whether a timeout has expired.
113

114
  """
115
  return _time_fn() > (epoch + timeout)
116

    
117

    
118
class _DiskImportExportBase(object):
119
  MODE_TEXT = None
120

    
121
  def __init__(self, lu, node_name, x509_key_name, remote_x509_ca,
122
               instance, timeouts, cbs, private=None):
123
    """Initializes this class.
124

125
    @param lu: Logical unit instance
126
    @type node_name: string
127
    @param node_name: Node name for import
128
    @type x509_key_name: string
129
    @param x509_key_name: Name of X509 key (None for node daemon key)
130
    @type remote_x509_ca: string
131
    @param remote_x509_ca: Remote peer's CA (None for node daemon certificate)
132
    @type instance: L{objects.Instance}
133
    @param instance: Instance object
134
    @type timeouts: L{ImportExportTimeouts}
135
    @param timeouts: Timeouts for this import
136
    @type cbs: L{ImportExportCbBase}
137
    @param cbs: Callbacks
138
    @param private: Private data for callback functions
139

140
    """
141
    assert self.MODE_TEXT
142

    
143
    self._lu = lu
144
    self.node_name = node_name
145
    self._x509_key_name = x509_key_name
146
    self._remote_x509_ca = remote_x509_ca
147
    self._instance = instance
148
    self._timeouts = timeouts
149
    self._cbs = cbs
150
    self._private = private
151

    
152
    # Parent loop
153
    self._loop = None
154

    
155
    # Timestamps
156
    self._ts_begin = None
157
    self._ts_connected = None
158
    self._ts_finished = None
159
    self._ts_cleanup = None
160
    self._ts_last_error = None
161

    
162
    # Transfer status
163
    self.success = None
164
    self.final_message = None
165

    
166
    # Daemon status
167
    self._daemon_name = None
168
    self._daemon = None
169

    
170
  @property
171
  def recent_output(self):
172
    """Returns the most recent output from the daemon.
173

174
    """
175
    if self._daemon:
176
      return self._daemon.recent_output
177

    
178
    return None
179

    
180
  @property
181
  def active(self):
182
    """Determines whether this transport is still active.
183

184
    """
185
    return self.success is None
186

    
187
  @property
188
  def loop(self):
189
    """Returns parent loop.
190

191
    @rtype: L{ImportExportLoop}
192

193
    """
194
    return self._loop
195

    
196
  def SetLoop(self, loop):
197
    """Sets the parent loop.
198

199
    @type loop: L{ImportExportLoop}
200

201
    """
202
    if self._loop:
203
      raise errors.ProgrammerError("Loop can only be set once")
204

    
205
    self._loop = loop
206

    
207
  def _StartDaemon(self):
208
    """Starts the import/export daemon.
209

210
    """
211
    raise NotImplementedError()
212

    
213
  def CheckDaemon(self):
214
    """Checks whether daemon has been started and if not, starts it.
215

216
    @rtype: string
217
    @return: Daemon name
218

219
    """
220
    assert self._ts_cleanup is None
221

    
222
    if self._daemon_name is None:
223
      assert self._ts_begin is None
224

    
225
      result = self._StartDaemon()
226
      if result.fail_msg:
227
        raise _ImportExportError("Failed to start %s on %s: %s" %
228
                                 (self.MODE_TEXT, self.node_name,
229
                                  result.fail_msg))
230

    
231
      daemon_name = result.payload
232

    
233
      logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name,
234
                   self.node_name)
235

    
236
      self._ts_begin = time.time()
237
      self._daemon_name = daemon_name
238

    
239
    return self._daemon_name
240

    
241
  def GetDaemonName(self):
242
    """Returns the daemon name.
243

244
    """
245
    assert self._daemon_name, "Daemon has not been started"
246
    assert self._ts_cleanup is None
247
    return self._daemon_name
248

    
249
  def Abort(self):
250
    """Sends SIGTERM to import/export daemon (if still active).
251

252
    """
253
    if self._daemon_name:
254
      self._lu.LogWarning("Aborting %s %r on %s",
255
                          self.MODE_TEXT, self._daemon_name, self.node_name)
256
      result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
257
      if result.fail_msg:
258
        self._lu.LogWarning("Failed to abort %s %r on %s: %s",
259
                            self.MODE_TEXT, self._daemon_name,
260
                            self.node_name, result.fail_msg)
261
        return False
262

    
263
    return True
264

    
265
  def _SetDaemonData(self, data):
266
    """Internal function for updating status daemon data.
267

268
    @type data: L{objects.ImportExportStatus}
269
    @param data: Daemon status data
270

271
    """
272
    assert self._ts_begin is not None
273

    
274
    if not data:
275
      if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
276
        raise _ImportExportError("Didn't become ready after %s seconds" %
277
                                 self._timeouts.ready)
278

    
279
      return False
280

    
281
    self._daemon = data
282

    
283
    return True
284

    
285
  def SetDaemonData(self, success, data):
286
    """Updates daemon status data.
287

288
    @type success: bool
289
    @param success: Whether fetching data was successful or not
290
    @type data: L{objects.ImportExportStatus}
291
    @param data: Daemon status data
292

293
    """
294
    if not success:
295
      if self._ts_last_error is None:
296
        self._ts_last_error = time.time()
297

    
298
      elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
299
        raise _ImportExportError("Too many errors while updating data")
300

    
301
      return False
302

    
303
    self._ts_last_error = None
304

    
305
    return self._SetDaemonData(data)
306

    
307
  def CheckListening(self):
308
    """Checks whether the daemon is listening.
309

310
    """
311
    raise NotImplementedError()
312

    
313
  def _GetConnectedCheckEpoch(self):
314
    """Returns timeout to calculate connect timeout.
315

316
    """
317
    raise NotImplementedError()
318

    
319
  def CheckConnected(self):
320
    """Checks whether the daemon is connected.
321

322
    @rtype: bool
323
    @return: Whether the daemon is connected
324

325
    """
326
    assert self._daemon, "Daemon status missing"
327

    
328
    if self._ts_connected is not None:
329
      return True
330

    
331
    if self._daemon.connected:
332
      self._ts_connected = time.time()
333

    
334
      # TODO: Log remote peer
335
      logging.debug("%s %r on %s is now connected",
336
                    self.MODE_TEXT, self._daemon_name, self.node_name)
337

    
338
      self._cbs.ReportConnected(self, self._private)
339

    
340
      return True
341

    
342
    if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
343
      raise _ImportExportError("Not connected after %s seconds" %
344
                               self._timeouts.connect)
345

    
346
    return False
347

    
348
  def CheckFinished(self):
349
    """Checks whether the daemon exited.
350

351
    @rtype: bool
352
    @return: Whether the transfer is finished
353

354
    """
355
    assert self._daemon, "Daemon status missing"
356

    
357
    if self._ts_finished:
358
      return True
359

    
360
    if self._daemon.exit_status is None:
361
      return False
362

    
363
    self._ts_finished = time.time()
364

    
365
    self._ReportFinished(self._daemon.exit_status == 0,
366
                         self._daemon.error_message)
367

    
368
    return True
369

    
370
  def _ReportFinished(self, success, message):
371
    """Transfer is finished or daemon exited.
372

373
    @type success: bool
374
    @param success: Whether the transfer was successful
375
    @type message: string
376
    @param message: Error message
377

378
    """
379
    assert self.success is None
380

    
381
    self.success = success
382
    self.final_message = message
383

    
384
    if success:
385
      logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name,
386
                   self.node_name)
387
    elif self._daemon_name:
388
      self._lu.LogWarning("%s %r on %s failed: %s",
389
                          self.MODE_TEXT, self._daemon_name, self.node_name,
390
                          message)
391
    else:
392
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
393
                          self.node_name, message)
394

    
395
    self._cbs.ReportFinished(self, self._private)
396

    
397
  def _Finalize(self):
398
    """Makes the RPC call to finalize this import/export.
399

400
    """
401
    return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
402

    
403
  def Finalize(self, error=None):
404
    """Finalizes this import/export.
405

406
    """
407
    assert error or self.success is not None
408

    
409
    if self._daemon_name:
410
      logging.info("Finalizing %s %r on %s",
411
                   self.MODE_TEXT, self._daemon_name, self.node_name)
412

    
413
      result = self._Finalize()
414
      if result.fail_msg:
415
        self._lu.LogWarning("Failed to finalize %s %r on %s: %s",
416
                            self.MODE_TEXT, self._daemon_name,
417
                            self.node_name, result.fail_msg)
418
        return False
419

    
420
      # Daemon is no longer running
421
      self._daemon_name = None
422
      self._ts_cleanup = time.time()
423

    
424
    if error:
425
      self._ReportFinished(False, error)
426

    
427
    return True
428

    
429

    
430
class DiskImport(_DiskImportExportBase):
431
  MODE_TEXT = "import"
432

    
433
  def __init__(self, lu, node_name, x509_key_name, source_x509_ca, instance,
434
               dest, dest_args, timeouts, cbs, private=None):
435
    """Initializes this class.
436

437
    @param lu: Logical unit instance
438
    @type node_name: string
439
    @param node_name: Node name for import
440
    @type x509_key_name: string
441
    @param x509_key_name: Name of X509 key (None for node daemon key)
442
    @type source_x509_ca: string
443
    @param source_x509_ca: Remote peer's CA (None for node daemon certificate)
444
    @type instance: L{objects.Instance}
445
    @param instance: Instance object
446
    @param dest: I/O destination
447
    @param dest_args: I/O arguments
448
    @type timeouts: L{ImportExportTimeouts}
449
    @param timeouts: Timeouts for this import
450
    @type cbs: L{ImportExportCbBase}
451
    @param cbs: Callbacks
452
    @param private: Private data for callback functions
453

454
    """
455
    _DiskImportExportBase.__init__(self, lu, node_name,
456
                                   x509_key_name, source_x509_ca,
457
                                   instance, timeouts, cbs, private)
458
    self._dest = dest
459
    self._dest_args = dest_args
460

    
461
    # Timestamps
462
    self._ts_listening = None
463

    
464
  @property
465
  def listen_port(self):
466
    """Returns the port the daemon is listening on.
467

468
    """
469
    if self._daemon:
470
      return self._daemon.listen_port
471

    
472
    return None
473

    
474
  def _StartDaemon(self):
475
    """Starts the import daemon.
476

477
    """
478
    return self._lu.rpc.call_import_start(self.node_name,
479
                                          self._x509_key_name,
480
                                          self._remote_x509_ca, self._instance,
481
                                          self._dest, self._dest_args)
482

    
483
  def CheckListening(self):
484
    """Checks whether the daemon is listening.
485

486
    @rtype: bool
487
    @return: Whether the daemon is listening
488

489
    """
490
    assert self._daemon, "Daemon status missing"
491

    
492
    if self._ts_listening is not None:
493
      return True
494

    
495
    port = self._daemon.listen_port
496
    if port is not None:
497
      self._ts_listening = time.time()
498

    
499
      logging.debug("Import %r on %s is now listening on port %s",
500
                    self._daemon_name, self.node_name, port)
501

    
502
      self._cbs.ReportListening(self, self._private)
503

    
504
      return True
505

    
506
    if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
507
      raise _ImportExportError("Not listening after %s seconds" %
508
                               self._timeouts.listen)
509

    
510
    return False
511

    
512
  def _GetConnectedCheckEpoch(self):
513
    """Returns the time since we started listening.
514

515
    """
516
    assert self._ts_listening is not None, \
517
           ("Checking whether an import is connected is only useful"
518
            " once it's been listening")
519

    
520
    return self._ts_listening
521

    
522

    
523
class DiskExport(_DiskImportExportBase):
524
  MODE_TEXT = "export"
525

    
526
  def __init__(self, lu, node_name, x509_key_name, dest_x509_ca,
527
               dest_host, dest_port, instance, source, source_args,
528
               timeouts, cbs, private=None):
529
    """Initializes this class.
530

531
    @param lu: Logical unit instance
532
    @type node_name: string
533
    @param node_name: Node name for import
534
    @type x509_key_name: string
535
    @param x509_key_name: Name of X509 key (None for node daemon key)
536
    @type dest_x509_ca: string
537
    @param dest_x509_ca: Remote peer's CA (None for node daemon certificate)
538
    @type dest_host: string
539
    @param dest_host: Destination host name or IP address
540
    @type dest_port: number
541
    @param dest_port: Destination port number
542
    @type instance: L{objects.Instance}
543
    @param instance: Instance object
544
    @param source: I/O source
545
    @param source_args: I/O source
546
    @type timeouts: L{ImportExportTimeouts}
547
    @param timeouts: Timeouts for this import
548
    @type cbs: L{ImportExportCbBase}
549
    @param cbs: Callbacks
550
    @param private: Private data for callback functions
551

552
    """
553
    _DiskImportExportBase.__init__(self, lu, node_name,
554
                                   x509_key_name, dest_x509_ca,
555
                                   instance, timeouts, cbs, private)
556
    self._dest_host = dest_host
557
    self._dest_port = dest_port
558
    self._source = source
559
    self._source_args = source_args
560

    
561
  def _StartDaemon(self):
562
    """Starts the export daemon.
563

564
    """
565
    return self._lu.rpc.call_export_start(self.node_name, self._x509_key_name,
566
                                          self._remote_x509_ca,
567
                                          self._dest_host, self._dest_port,
568
                                          self._instance, self._source,
569
                                          self._source_args)
570

    
571
  def CheckListening(self):
572
    """Checks whether the daemon is listening.
573

574
    """
575
    # Only an import can be listening
576
    return True
577

    
578
  def _GetConnectedCheckEpoch(self):
579
    """Returns the time since the daemon started.
580

581
    """
582
    assert self._ts_begin is not None
583

    
584
    return self._ts_begin
585

    
586

    
587
class ImportExportLoop:
588
  MIN_DELAY = 1.0
589
  MAX_DELAY = 20.0
590

    
591
  def __init__(self, lu):
592
    """Initializes this class.
593

594
    """
595
    self._lu = lu
596
    self._queue = []
597
    self._pending_add = []
598

    
599
  def Add(self, diskie):
600
    """Adds an import/export object to the loop.
601

602
    @type diskie: Subclass of L{_DiskImportExportBase}
603
    @param diskie: Import/export object
604

605
    """
606
    assert diskie not in self._pending_add
607
    assert diskie.loop is None
608

    
609
    diskie.SetLoop(self)
610

    
611
    # Adding new objects to a staging list is necessary, otherwise the main
612
    # loop gets confused if callbacks modify the queue while the main loop is
613
    # iterating over it.
614
    self._pending_add.append(diskie)
615

    
616
  @staticmethod
617
  def _CollectDaemonStatus(lu, daemons):
618
    """Collects the status for all import/export daemons.
619

620
    """
621
    daemon_status = {}
622

    
623
    for node_name, names in daemons.iteritems():
624
      result = lu.rpc.call_impexp_status(node_name, names)
625
      if result.fail_msg:
626
        lu.LogWarning("Failed to get daemon status on %s: %s",
627
                      node_name, result.fail_msg)
628
        continue
629

    
630
      assert len(names) == len(result.payload)
631

    
632
      daemon_status[node_name] = dict(zip(names, result.payload))
633

    
634
    return daemon_status
635

    
636
  @staticmethod
637
  def _GetActiveDaemonNames(queue):
638
    """Gets the names of all active daemons.
639

640
    """
641
    result = {}
642
    for diskie in queue:
643
      if not diskie.active:
644
        continue
645

    
646
      try:
647
        # Start daemon if necessary
648
        daemon_name = diskie.CheckDaemon()
649
      except _ImportExportError, err:
650
        logging.exception("%s failed", diskie.MODE_TEXT)
651
        diskie.Finalize(error=str(err))
652
        continue
653

    
654
      result.setdefault(diskie.node_name, []).append(daemon_name)
655

    
656
    assert len(queue) >= len(result)
657
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
658

    
659
    logging.debug("daemons=%r", result)
660

    
661
    return result
662

    
663
  def _AddPendingToQueue(self):
664
    """Adds all pending import/export objects to the internal queue.
665

666
    """
667
    assert compat.all(diskie not in self._queue and diskie.loop == self
668
                      for diskie in self._pending_add)
669

    
670
    self._queue.extend(self._pending_add)
671

    
672
    del self._pending_add[:]
673

    
674
  def Run(self):
675
    """Utility main loop.
676

677
    """
678
    while True:
679
      self._AddPendingToQueue()
680

    
681
      # Collect all active daemon names
682
      daemons = self._GetActiveDaemonNames(self._queue)
683
      if not daemons:
684
        break
685

    
686
      # Collection daemon status data
687
      data = self._CollectDaemonStatus(self._lu, daemons)
688

    
689
      # Use data
690
      delay = self.MAX_DELAY
691
      for diskie in self._queue:
692
        if not diskie.active:
693
          continue
694

    
695
        try:
696
          try:
697
            all_daemon_data = data[diskie.node_name]
698
          except KeyError:
699
            result = diskie.SetDaemonData(False, None)
700
          else:
701
            result = \
702
              diskie.SetDaemonData(True,
703
                                   all_daemon_data[diskie.GetDaemonName()])
704

    
705
          if not result:
706
            # Daemon not yet ready, retry soon
707
            delay = min(3.0, delay)
708
            continue
709

    
710
          if diskie.CheckFinished():
711
            # Transfer finished
712
            diskie.Finalize()
713
            continue
714

    
715
          # Normal case: check again in 5 seconds
716
          delay = min(5.0, delay)
717

    
718
          if not diskie.CheckListening():
719
            # Not yet listening, retry soon
720
            delay = min(1.0, delay)
721
            continue
722

    
723
          if not diskie.CheckConnected():
724
            # Not yet connected, retry soon
725
            delay = min(1.0, delay)
726
            continue
727

    
728
        except _ImportExportError, err:
729
          logging.exception("%s failed", diskie.MODE_TEXT)
730
          diskie.Finalize(error=str(err))
731

    
732
      if not compat.any([diskie.active for diskie in self._queue]):
733
        break
734

    
735
      # Wait a bit
736
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
737
      logging.debug("Waiting for %ss", delay)
738
      time.sleep(delay)
739

    
740
  def FinalizeAll(self):
741
    """Finalizes all pending transfers.
742

743
    """
744
    success = True
745

    
746
    for diskie in self._queue:
747
      success = diskie.Finalize() and success
748

    
749
    return success