Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ f362096f

History | View | Annotate | Download (18.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script to show add a new node to the cluster
23

24
"""
25

    
26
# pylint: disable-msg=C0103
27

    
28
import os
29

    
30
from twisted.internet.pollreactor import PollReactor
31

    
32
class ReReactor(PollReactor):
33
  """A re-startable Reactor implementation.
34

35
  """
36
  def run(self, installSignalHandlers=1):
37
    """Custom run method.
38

39
    This is customized run that, before calling Reactor.run, will
40
    reinstall the shutdown events and re-create the threadpool in case
41
    these are not present (as will happen on the second run of the
42
    reactor).
43

44
    """
45
    if not 'shutdown' in self._eventTriggers:
46
      # the shutdown queue has been killed, we are most probably
47
      # at the second run, thus recreate the queue
48
      self.addSystemEventTrigger('during', 'shutdown', self.crash)
49
      self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
50
    if self.threadpool is not None and self.threadpool.joined == 1:
51
      # in case the threadpool has been stopped, re-start it
52
      # and add a trigger to stop it at reactor shutdown
53
      self.threadpool.start()
54
      self.addSystemEventTrigger('during', 'shutdown', self.threadpool.stop)
55

    
56
    return PollReactor.run(self, installSignalHandlers)
57

    
58

    
59
import twisted.internet.main
60
twisted.internet.main.installReactor(ReReactor())
61

    
62
from twisted.spread import pb
63
from twisted.internet import reactor
64
from twisted.cred import credentials
65
from OpenSSL import SSL, crypto
66

    
67
from ganeti import logger
68
from ganeti import utils
69
from ganeti import errors
70
from ganeti import constants
71
from ganeti import objects
72
from ganeti import ssconf
73

    
74
class NodeController:
75
  """Node-handling class.
76

77
  For each node that we speak with, we create an instance of this
78
  class, so that we have a safe place to store the details of this
79
  individual call.
80

81
  """
82
  def __init__(self, parent, node):
83
    self.parent = parent
84
    self.node = node
85

    
86
  def _check_end(self):
87
    """Stop the reactor if we got all the results.
88

89
    """
90
    if len(self.parent.results) == len(self.parent.nc):
91
      reactor.stop()
92

    
93
  def cb_call(self, obj):
94
    """Callback for successful connect.
95

96
    If the connect and login sequence succeeded, we proceed with
97
    making the actual call.
98

99
    """
100
    deferred = obj.callRemote(self.parent.procedure, self.parent.args)
101
    deferred.addCallbacks(self.cb_done, self.cb_err2)
102

    
103
  def cb_done(self, result):
104
    """Callback for successful call.
105

106
    When we receive the result from a call, we check if it was an
107
    error and if so we raise a generic RemoteError (we can't pass yet
108
    the actual exception over). If there was no error, we store the
109
    result.
110

111
    """
112
    tb, self.parent.results[self.node] = result
113
    self._check_end()
114
    if tb:
115
      raise errors.RemoteError("Remote procedure error calling %s on %s:"
116
                               "\n%s" % (self.parent.procedure,
117
                                         self.node,
118
                                         tb))
119

    
120
  def cb_err1(self, reason):
121
    """Error callback for unsuccessful connect.
122

123
    """
124
    logger.Error("caller_connect: could not connect to remote host %s,"
125
                 " reason %s" % (self.node, reason))
126
    self.parent.results[self.node] = False
127
    self._check_end()
128

    
129
  def cb_err2(self, reason):
130
    """Error callback for unsuccessful call.
131

132
    This is when the call didn't return anything, not even an error,
133
    or when it time out, etc.
134

135
    """
136
    logger.Error("caller_call: could not call %s on node %s,"
137
                 " reason %s" % (self.parent.procedure, self.node, reason))
138
    self.parent.results[self.node] = False
139
    self._check_end()
140

    
141

    
142
class MirrorContextFactory:
143
  """Certificate verifier factory.
144

145
  This factory creates contexts that verify if the remote end has a
146
  specific certificate (i.e. our own certificate).
147

148
  The checks we do are that the PEM dump of the certificate is the
149
  same as our own and (somewhat redundantly) that the SHA checksum is
150
  the same.
151

152
  """
153
  isClient = 1
154

    
155
  def __init__(self):
156
    try:
157
      fd = open(constants.SSL_CERT_FILE, 'r')
158
      try:
159
        data = fd.read(16384)
160
      finally:
161
        fd.close()
162
    except EnvironmentError, err:
163
      raise errors.ConfigurationError("missing SSL certificate: %s" %
164
                                      str(err))
165
    self.mycert = crypto.load_certificate(crypto.FILETYPE_PEM, data)
166
    self.mypem = crypto.dump_certificate(crypto.FILETYPE_PEM, self.mycert)
167
    self.mydigest = self.mycert.digest('SHA')
168

    
169
  def verifier(self, conn, x509, errno, err_depth, retcode):
170
    """Certificate verify method.
171

172
    """
173
    if self.mydigest != x509.digest('SHA'):
174
      return False
175
    if crypto.dump_certificate(crypto.FILETYPE_PEM, x509) != self.mypem:
176
      return False
177
    return True
178

    
179
  def getContext(self):
180
    """Context generator.
181

182
    """
183
    context = SSL.Context(SSL.TLSv1_METHOD)
184
    context.set_verify(SSL.VERIFY_PEER, self.verifier)
185
    return context
186

    
187
class Client:
188
  """RPC Client class.
189

190
  This class, given a (remote) method name, a list of parameters and a
191
  list of nodes, will contact (in parallel) all nodes, and return a
192
  dict of results (key: node name, value: result).
193

194
  One current bug is that generic failure is still signalled by
195
  'False' result, which is not good. This overloading of values can
196
  cause bugs.
197

198
  """
199
  result_set = False
200
  result = False
201
  allresult = []
202

    
203
  def __init__(self, procedure, args):
204
    ss = ssconf.SimpleStore()
205
    self.port = ss.GetNodeDaemonPort()
206
    self.nodepw = ss.GetNodeDaemonPassword()
207
    self.nc = {}
208
    self.results = {}
209
    self.procedure = procedure
210
    self.args = args
211

    
212
  #--- generic connector -------------
213

    
214
  def connect_list(self, node_list):
215
    """Add a list of nodes to the target nodes.
216

217
    """
218
    for node in node_list:
219
      self.connect(node)
220

    
221
  def connect(self, connect_node):
222
    """Add a node to the target list.
223

224
    """
225
    factory = pb.PBClientFactory()
226
    self.nc[connect_node] = nc = NodeController(self, connect_node)
227
    reactor.connectSSL(connect_node, self.port, factory,
228
                       MirrorContextFactory())
229
    #d = factory.getRootObject()
230
    d = factory.login(credentials.UsernamePassword("master_node", self.nodepw))
231
    d.addCallbacks(nc.cb_call, nc.cb_err1)
232

    
233
  def getresult(self):
234
    """Return the results of the call.
235

236
    """
237
    return self.results
238

    
239
  def run(self):
240
    """Wrapper over reactor.run().
241

242
    This function simply calls reactor.run() if we have any requests
243
    queued, otherwise it does nothing.
244

245
    """
246
    if self.nc:
247
      reactor.run()
248

    
249

    
250
def call_volume_list(node_list, vg_name):
251
  """Gets the logical volumes present in a given volume group.
252

253
  This is a multi-node call.
254

255
  """
256
  c = Client("volume_list", [vg_name])
257
  c.connect_list(node_list)
258
  c.run()
259
  return c.getresult()
260

    
261

    
262
def call_vg_list(node_list):
263
  """Gets the volume group list.
264

265
  This is a multi-node call.
266

267
  """
268
  c = Client("vg_list", [])
269
  c.connect_list(node_list)
270
  c.run()
271
  return c.getresult()
272

    
273

    
274
def call_bridges_exist(node, bridges_list):
275
  """Checks if a node has all the bridges given.
276

277
  This method checks if all bridges given in the bridges_list are
278
  present on the remote node, so that an instance that uses interfaces
279
  on those bridges can be started.
280

281
  This is a single-node call.
282

283
  """
284
  c = Client("bridges_exist", [bridges_list])
285
  c.connect(node)
286
  c.run()
287
  return c.getresult().get(node, False)
288

    
289

    
290
def call_instance_start(node, instance, extra_args):
291
  """Starts an instance.
292

293
  This is a single-node call.
294

295
  """
296
  c = Client("instance_start", [instance.ToDict(), extra_args])
297
  c.connect(node)
298
  c.run()
299
  return c.getresult().get(node, False)
300

    
301

    
302
def call_instance_shutdown(node, instance):
303
  """Stops an instance.
304

305
  This is a single-node call.
306

307
  """
308
  c = Client("instance_shutdown", [instance.ToDict()])
309
  c.connect(node)
310
  c.run()
311
  return c.getresult().get(node, False)
312

    
313

    
314
def call_instance_reboot(node, instance, reboot_type, extra_args):
315
  """Reboots an instance.
316

317
  This is a single-node call.
318

319
  """
320
  c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
321
  c.connect(node)
322
  c.run()
323
  return c.getresult().get(node, False)
324

    
325

    
326
def call_instance_os_add(node, inst, osdev, swapdev):
327
  """Installs an OS on the given instance.
328

329
  This is a single-node call.
330

331
  """
332
  params = [inst.ToDict(), osdev, swapdev]
333
  c = Client("instance_os_add", params)
334
  c.connect(node)
335
  c.run()
336
  return c.getresult().get(node, False)
337

    
338

    
339
def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
340
  """Run the OS rename script for an instance.
341

342
  This is a single-node call.
343

344
  """
345
  params = [inst.ToDict(), old_name, osdev, swapdev]
346
  c = Client("instance_run_rename", params)
347
  c.connect(node)
348
  c.run()
349
  return c.getresult().get(node, False)
350

    
351

    
352
def call_instance_info(node, instance):
353
  """Returns information about a single instance.
354

355
  This is a single-node call.
356

357
  """
358
  c = Client("instance_info", [instance])
359
  c.connect(node)
360
  c.run()
361
  return c.getresult().get(node, False)
362

    
363

    
364
def call_all_instances_info(node_list):
365
  """Returns information about all instances on a given node.
366

367
  This is a single-node call.
368

369
  """
370
  c = Client("all_instances_info", [])
371
  c.connect_list(node_list)
372
  c.run()
373
  return c.getresult()
374

    
375

    
376
def call_instance_list(node_list):
377
  """Returns the list of running instances on a given node.
378

379
  This is a single-node call.
380

381
  """
382
  c = Client("instance_list", [])
383
  c.connect_list(node_list)
384
  c.run()
385
  return c.getresult()
386

    
387

    
388
def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
389
  """Do a TcpPing on the remote node
390

391
  This is a single-node call.
392
  """
393
  c = Client("node_tcp_ping", [source, target, port, timeout,
394
                               live_port_needed])
395
  c.connect(node)
396
  c.run()
397
  return c.getresult().get(node, False)
398

    
399

    
400
def call_node_info(node_list, vg_name):
401
  """Return node information.
402

403
  This will return memory information and volume group size and free
404
  space.
405

406
  This is a multi-node call.
407

408
  """
409
  c = Client("node_info", [vg_name])
410
  c.connect_list(node_list)
411
  c.run()
412
  retux = c.getresult()
413

    
414
  for node_name in retux:
415
    ret = retux.get(node_name, False)
416
    if type(ret) != dict:
417
      logger.Error("could not connect to node %s" % (node_name))
418
      ret = {}
419

    
420
    utils.CheckDict(ret,
421
                    { 'memory_total' : '-',
422
                      'memory_dom0' : '-',
423
                      'memory_free' : '-',
424
                      'vg_size' : 'node_unreachable',
425
                      'vg_free' : '-' },
426
                    "call_node_info",
427
                    )
428
  return retux
429

    
430

    
431
def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
432
  """Add a node to the cluster.
433

434
  This is a single-node call.
435

436
  """
437
  params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
438
  c = Client("node_add", params)
439
  c.connect(node)
440
  c.run()
441
  return c.getresult().get(node, False)
442

    
443

    
444
def call_node_verify(node_list, checkdict):
445
  """Request verification of given parameters.
446

447
  This is a multi-node call.
448

449
  """
450
  c = Client("node_verify", [checkdict])
451
  c.connect_list(node_list)
452
  c.run()
453
  return c.getresult()
454

    
455

    
456
def call_node_start_master(node):
457
  """Tells a node to activate itself as a master.
458

459
  This is a single-node call.
460

461
  """
462
  c = Client("node_start_master", [])
463
  c.connect(node)
464
  c.run()
465
  return c.getresult().get(node, False)
466

    
467

    
468
def call_node_stop_master(node):
469
  """Tells a node to demote itself from master status.
470

471
  This is a single-node call.
472

473
  """
474
  c = Client("node_stop_master", [])
475
  c.connect(node)
476
  c.run()
477
  return c.getresult().get(node, False)
478

    
479

    
480
def call_version(node_list):
481
  """Query node version.
482

483
  This is a multi-node call.
484

485
  """
486
  c = Client("version", [])
487
  c.connect_list(node_list)
488
  c.run()
489
  return c.getresult()
490

    
491

    
492
def call_blockdev_create(node, bdev, size, owner, on_primary, info):
493
  """Request creation of a given block device.
494

495
  This is a single-node call.
496

497
  """
498
  params = [bdev.ToDict(), size, owner, on_primary, info]
499
  c = Client("blockdev_create", params)
500
  c.connect(node)
501
  c.run()
502
  return c.getresult().get(node, False)
503

    
504

    
505
def call_blockdev_remove(node, bdev):
506
  """Request removal of a given block device.
507

508
  This is a single-node call.
509

510
  """
511
  c = Client("blockdev_remove", [bdev.ToDict()])
512
  c.connect(node)
513
  c.run()
514
  return c.getresult().get(node, False)
515

    
516

    
517
def call_blockdev_rename(node, devlist):
518
  """Request rename of the given block devices.
519

520
  This is a single-node call.
521

522
  """
523
  params = [(d.ToDict(), uid) for d, uid in devlist]
524
  c = Client("blockdev_rename", params)
525
  c.connect(node)
526
  c.run()
527
  return c.getresult().get(node, False)
528

    
529

    
530
def call_blockdev_assemble(node, disk, owner, on_primary):
531
  """Request assembling of a given block device.
532

533
  This is a single-node call.
534

535
  """
536
  params = [disk.ToDict(), owner, on_primary]
537
  c = Client("blockdev_assemble", params)
538
  c.connect(node)
539
  c.run()
540
  return c.getresult().get(node, False)
541

    
542

    
543
def call_blockdev_shutdown(node, disk):
544
  """Request shutdown of a given block device.
545

546
  This is a single-node call.
547

548
  """
549
  c = Client("blockdev_shutdown", [disk.ToDict()])
550
  c.connect(node)
551
  c.run()
552
  return c.getresult().get(node, False)
553

    
554

    
555
def call_blockdev_addchildren(node, bdev, ndevs):
556
  """Request adding a list of children to a (mirroring) device.
557

558
  This is a single-node call.
559

560
  """
561
  params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
562
  c = Client("blockdev_addchildren", params)
563
  c.connect(node)
564
  c.run()
565
  return c.getresult().get(node, False)
566

    
567

    
568
def call_blockdev_removechildren(node, bdev, ndevs):
569
  """Request removing a list of children from a (mirroring) device.
570

571
  This is a single-node call.
572

573
  """
574
  params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
575
  c = Client("blockdev_removechildren", params)
576
  c.connect(node)
577
  c.run()
578
  return c.getresult().get(node, False)
579

    
580

    
581
def call_blockdev_getmirrorstatus(node, disks):
582
  """Request status of a (mirroring) device.
583

584
  This is a single-node call.
585

586
  """
587
  params = [dsk.ToDict() for dsk in disks]
588
  c = Client("blockdev_getmirrorstatus", params)
589
  c.connect(node)
590
  c.run()
591
  return c.getresult().get(node, False)
592

    
593

    
594
def call_blockdev_find(node, disk):
595
  """Request identification of a given block device.
596

597
  This is a single-node call.
598

599
  """
600
  c = Client("blockdev_find", [disk.ToDict()])
601
  c.connect(node)
602
  c.run()
603
  return c.getresult().get(node, False)
604

    
605

    
606
def call_upload_file(node_list, file_name):
607
  """Upload a file.
608

609
  The node will refuse the operation in case the file is not on the
610
  approved file list.
611

612
  This is a multi-node call.
613

614
  """
615
  fh = file(file_name)
616
  try:
617
    data = fh.read()
618
  finally:
619
    fh.close()
620
  st = os.stat(file_name)
621
  params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
622
            st.st_atime, st.st_mtime]
623
  c = Client("upload_file", params)
624
  c.connect_list(node_list)
625
  c.run()
626
  return c.getresult()
627

    
628

    
629
def call_os_diagnose(node_list):
630
  """Request a diagnose of OS definitions.
631

632
  This is a multi-node call.
633

634
  """
635
  c = Client("os_diagnose", [])
636
  c.connect_list(node_list)
637
  c.run()
638
  result = c.getresult()
639
  new_result = {}
640
  for node_name in result:
641
    if result[node_name]:
642
      nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
643
    else:
644
      nr = []
645
    new_result[node_name] = nr
646
  return new_result
647

    
648

    
649
def call_os_get(node, name):
650
  """Returns an OS definition.
651

652
  This is a single-node call.
653

654
  """
655
  c = Client("os_get", [name])
656
  c.connect(node)
657
  c.run()
658
  result = c.getresult().get(node, False)
659
  if isinstance(result, dict):
660
    return objects.OS.FromDict(result)
661
  else:
662
    return result
663

    
664

    
665
def call_hooks_runner(node_list, hpath, phase, env):
666
  """Call the hooks runner.
667

668
  Args:
669
    - op: the OpCode instance
670
    - env: a dictionary with the environment
671

672
  This is a multi-node call.
673

674
  """
675
  params = [hpath, phase, env]
676
  c = Client("hooks_runner", params)
677
  c.connect_list(node_list)
678
  c.run()
679
  result = c.getresult()
680
  return result
681

    
682

    
683
def call_blockdev_snapshot(node, cf_bdev):
684
  """Request a snapshot of the given block device.
685

686
  This is a single-node call.
687

688
  """
689
  c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
690
  c.connect(node)
691
  c.run()
692
  return c.getresult().get(node, False)
693

    
694

    
695
def call_snapshot_export(node, snap_bdev, dest_node, instance):
696
  """Request the export of a given snapshot.
697

698
  This is a single-node call.
699

700
  """
701
  params = [snap_bdev.ToDict(), dest_node, instance.ToDict()]
702
  c = Client("snapshot_export", params)
703
  c.connect(node)
704
  c.run()
705
  return c.getresult().get(node, False)
706

    
707

    
708
def call_finalize_export(node, instance, snap_disks):
709
  """Request the completion of an export operation.
710

711
  This writes the export config file, etc.
712

713
  This is a single-node call.
714

715
  """
716
  flat_disks = []
717
  for disk in snap_disks:
718
    flat_disks.append(disk.ToDict())
719
  params = [instance.ToDict(), flat_disks]
720
  c = Client("finalize_export", params)
721
  c.connect(node)
722
  c.run()
723
  return c.getresult().get(node, False)
724

    
725

    
726
def call_export_info(node, path):
727
  """Queries the export information in a given path.
728

729
  This is a single-node call.
730

731
  """
732
  c = Client("export_info", [path])
733
  c.connect(node)
734
  c.run()
735
  result = c.getresult().get(node, False)
736
  if not result:
737
    return result
738
  return objects.SerializableConfigParser.Loads(result)
739

    
740

    
741
def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
742
  """Request the import of a backup into an instance.
743

744
  This is a single-node call.
745

746
  """
747
  params = [inst.ToDict(), osdev, swapdev, src_node, src_image]
748
  c = Client("instance_os_import", params)
749
  c.connect(node)
750
  c.run()
751
  return c.getresult().get(node, False)
752

    
753

    
754
def call_export_list(node_list):
755
  """Gets the stored exports list.
756

757
  This is a multi-node call.
758

759
  """
760
  c = Client("export_list", [])
761
  c.connect_list(node_list)
762
  c.run()
763
  result = c.getresult()
764
  return result
765

    
766

    
767
def call_export_remove(node, export):
768
  """Requests removal of a given export.
769

770
  This is a single-node call.
771

772
  """
773
  c = Client("export_remove", [export])
774
  c.connect(node)
775
  c.run()
776
  return c.getresult().get(node, False)
777

    
778

    
779
def call_node_leave_cluster(node):
780
  """Requests a node to clean the cluster information it has.
781

782
  This will remove the configuration information from the ganeti data
783
  dir.
784

785
  This is a single-node call.
786

787
  """
788
  c = Client("node_leave_cluster", [])
789
  c.connect(node)
790
  c.run()
791
  return c.getresult().get(node, False)
792

    
793

    
794
def call_node_volumes(node_list):
795
  """Gets all volumes on node(s).
796

797
  This is a multi-node call.
798

799
  """
800
  c = Client("node_volumes", [])
801
  c.connect_list(node_list)
802
  c.run()
803
  return c.getresult()