Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ decd5f45

History | View | Annotate | Download (18.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script to show add a new node to the cluster
23

24
"""
25

    
26
# pylint: disable-msg=C0103
27

    
28
import os
29

    
30
from twisted.internet.pollreactor import PollReactor
31

    
32
class ReReactor(PollReactor):
33
  """A re-startable Reactor implementation.
34

35
  """
36
  def run(self, installSignalHandlers=1):
37
    """Custom run method.
38

39
    This is customized run that, before calling Reactor.run, will
40
    reinstall the shutdown events and re-create the threadpool in case
41
    these are not present (as will happen on the second run of the
42
    reactor).
43

44
    """
45
    if not 'shutdown' in self._eventTriggers:
46
      # the shutdown queue has been killed, we are most probably
47
      # at the second run, thus recreate the queue
48
      self.addSystemEventTrigger('during', 'shutdown', self.crash)
49
      self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
50
    if self.threadpool is not None and self.threadpool.joined == 1:
51
      # in case the threadpool has been stopped, re-start it
52
      # and add a trigger to stop it at reactor shutdown
53
      self.threadpool.start()
54
      self.addSystemEventTrigger('during', 'shutdown', self.threadpool.stop)
55

    
56
    return PollReactor.run(self, installSignalHandlers)
57

    
58

    
59
import twisted.internet.main
60
twisted.internet.main.installReactor(ReReactor())
61

    
62
from twisted.spread import pb
63
from twisted.internet import reactor
64
from twisted.cred import credentials
65
from OpenSSL import SSL, crypto
66

    
67
from ganeti import logger
68
from ganeti import utils
69
from ganeti import errors
70
from ganeti import constants
71
from ganeti import objects
72
from ganeti import ssconf
73

    
74
class NodeController:
75
  """Node-handling class.
76

77
  For each node that we speak with, we create an instance of this
78
  class, so that we have a safe place to store the details of this
79
  individual call.
80

81
  """
82
  def __init__(self, parent, node):
83
    self.parent = parent
84
    self.node = node
85

    
86
  def _check_end(self):
87
    """Stop the reactor if we got all the results.
88

89
    """
90
    if len(self.parent.results) == len(self.parent.nc):
91
      reactor.stop()
92

    
93
  def cb_call(self, obj):
94
    """Callback for successful connect.
95

96
    If the connect and login sequence succeeded, we proceed with
97
    making the actual call.
98

99
    """
100
    deferred = obj.callRemote(self.parent.procedure, self.parent.args)
101
    deferred.addCallbacks(self.cb_done, self.cb_err2)
102

    
103
  def cb_done(self, result):
104
    """Callback for successful call.
105

106
    When we receive the result from a call, we check if it was an
107
    error and if so we raise a generic RemoteError (we can't pass yet
108
    the actual exception over). If there was no error, we store the
109
    result.
110

111
    """
112
    tb, self.parent.results[self.node] = result
113
    self._check_end()
114
    if tb:
115
      raise errors.RemoteError("Remote procedure error calling %s on %s:"
116
                               "\n%s" % (self.parent.procedure,
117
                                         self.node,
118
                                         tb))
119

    
120
  def cb_err1(self, reason):
121
    """Error callback for unsuccessful connect.
122

123
    """
124
    logger.Error("caller_connect: could not connect to remote host %s,"
125
                 " reason %s" % (self.node, reason))
126
    self.parent.results[self.node] = False
127
    self._check_end()
128

    
129
  def cb_err2(self, reason):
130
    """Error callback for unsuccessful call.
131

132
    This is when the call didn't return anything, not even an error,
133
    or when it time out, etc.
134

135
    """
136
    logger.Error("caller_call: could not call %s on node %s,"
137
                 " reason %s" % (self.parent.procedure, self.node, reason))
138
    self.parent.results[self.node] = False
139
    self._check_end()
140

    
141

    
142
class MirrorContextFactory:
143
  """Certificate verifier factory.
144

145
  This factory creates contexts that verify if the remote end has a
146
  specific certificate (i.e. our own certificate).
147

148
  The checks we do are that the PEM dump of the certificate is the
149
  same as our own and (somewhat redundantly) that the SHA checksum is
150
  the same.
151

152
  """
153
  isClient = 1
154

    
155
  def __init__(self):
156
    try:
157
      fd = open(constants.SSL_CERT_FILE, 'r')
158
      try:
159
        data = fd.read(16384)
160
      finally:
161
        fd.close()
162
    except EnvironmentError, err:
163
      raise errors.ConfigurationError("missing SSL certificate: %s" %
164
                                      str(err))
165
    self.mycert = crypto.load_certificate(crypto.FILETYPE_PEM, data)
166
    self.mypem = crypto.dump_certificate(crypto.FILETYPE_PEM, self.mycert)
167
    self.mydigest = self.mycert.digest('SHA')
168

    
169
  def verifier(self, conn, x509, errno, err_depth, retcode):
170
    """Certificate verify method.
171

172
    """
173
    if self.mydigest != x509.digest('SHA'):
174
      return False
175
    if crypto.dump_certificate(crypto.FILETYPE_PEM, x509) != self.mypem:
176
      return False
177
    return True
178

    
179
  def getContext(self):
180
    """Context generator.
181

182
    """
183
    context = SSL.Context(SSL.TLSv1_METHOD)
184
    context.set_verify(SSL.VERIFY_PEER, self.verifier)
185
    return context
186

    
187
class Client:
188
  """RPC Client class.
189

190
  This class, given a (remote) ethod name, a list of parameters and a
191
  list of nodes, will contact (in parallel) all nodes, and return a
192
  dict of results (key: node name, value: result).
193

194
  One current bug is that generic failure is still signalled by
195
  'False' result, which is not good. This overloading of values can
196
  cause bugs.
197

198
  """
199
  result_set = False
200
  result = False
201
  allresult = []
202

    
203
  def __init__(self, procedure, args):
204
    ss = ssconf.SimpleStore()
205
    self.port = ss.GetNodeDaemonPort()
206
    self.nodepw = ss.GetNodeDaemonPassword()
207
    self.nc = {}
208
    self.results = {}
209
    self.procedure = procedure
210
    self.args = args
211

    
212
  #--- generic connector -------------
213

    
214
  def connect_list(self, node_list):
215
    """Add a list of nodes to the target nodes.
216

217
    """
218
    for node in node_list:
219
      self.connect(node)
220

    
221
  def connect(self, connect_node):
222
    """Add a node to the target list.
223

224
    """
225
    factory = pb.PBClientFactory()
226
    self.nc[connect_node] = nc = NodeController(self, connect_node)
227
    reactor.connectSSL(connect_node, self.port, factory,
228
                       MirrorContextFactory())
229
    #d = factory.getRootObject()
230
    d = factory.login(credentials.UsernamePassword("master_node", self.nodepw))
231
    d.addCallbacks(nc.cb_call, nc.cb_err1)
232

    
233
  def getresult(self):
234
    """Return the results of the call.
235

236
    """
237
    return self.results
238

    
239
  def run(self):
240
    """Wrapper over reactor.run().
241

242
    This function simply calls reactor.run() if we have any requests
243
    queued, otherwise it does nothing.
244

245
    """
246
    if self.nc:
247
      reactor.run()
248

    
249

    
250
def call_volume_list(node_list, vg_name):
251
  """Gets the logical volumes present in a given volume group.
252

253
  This is a multi-node call.
254

255
  """
256
  c = Client("volume_list", [vg_name])
257
  c.connect_list(node_list)
258
  c.run()
259
  return c.getresult()
260

    
261

    
262
def call_vg_list(node_list):
263
  """Gets the volume group list.
264

265
  This is a multi-node call.
266

267
  """
268
  c = Client("vg_list", [])
269
  c.connect_list(node_list)
270
  c.run()
271
  return c.getresult()
272

    
273

    
274
def call_bridges_exist(node, bridges_list):
275
  """Checks if a node has all the bridges given.
276

277
  This method checks if all bridges given in the bridges_list are
278
  present on the remote node, so that an instance that uses interfaces
279
  on those bridges can be started.
280

281
  This is a single-node call.
282

283
  """
284
  c = Client("bridges_exist", [bridges_list])
285
  c.connect(node)
286
  c.run()
287
  return c.getresult().get(node, False)
288

    
289

    
290
def call_instance_start(node, instance, extra_args):
291
  """Stars an instance.
292

293
  This is a single-node call.
294

295
  """
296
  c = Client("instance_start", [instance.Dumps(), extra_args])
297
  c.connect(node)
298
  c.run()
299
  return c.getresult().get(node, False)
300

    
301

    
302
def call_instance_shutdown(node, instance):
303
  """Stops an instance.
304

305
  This is a single-node call.
306

307
  """
308
  c = Client("instance_shutdown", [instance.Dumps()])
309
  c.connect(node)
310
  c.run()
311
  return c.getresult().get(node, False)
312

    
313

    
314
def call_instance_os_add(node, inst, osdev, swapdev):
315
  """Installs an OS on the given instance.
316

317
  This is a single-node call.
318

319
  """
320
  params = [inst.Dumps(), osdev, swapdev]
321
  c = Client("instance_os_add", params)
322
  c.connect(node)
323
  c.run()
324
  return c.getresult().get(node, False)
325

    
326

    
327
def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
328
  """Run the OS rename script for an instance.
329

330
  This is a single-node call.
331

332
  """
333
  params = [inst.Dumps(), old_name, osdev, swapdev]
334
  c = Client("instance_run_rename", params)
335
  c.connect(node)
336
  c.run()
337
  return c.getresult().get(node, False)
338

    
339

    
340
def call_instance_info(node, instance):
341
  """Returns information about a single instance.
342

343
  This is a single-node call.
344

345
  """
346
  c = Client("instance_info", [instance])
347
  c.connect(node)
348
  c.run()
349
  return c.getresult().get(node, False)
350

    
351

    
352
def call_all_instances_info(node_list):
353
  """Returns information about all instances on a given node.
354

355
  This is a single-node call.
356

357
  """
358
  c = Client("all_instances_info", [])
359
  c.connect_list(node_list)
360
  c.run()
361
  return c.getresult()
362

    
363

    
364
def call_instance_list(node_list):
365
  """Returns the list of running instances on a given node.
366

367
  This is a single-node call.
368

369
  """
370
  c = Client("instance_list", [])
371
  c.connect_list(node_list)
372
  c.run()
373
  return c.getresult()
374

    
375

    
376
def call_node_info(node_list, vg_name):
377
  """Return node information.
378

379
  This will return memory information and volume group size and free
380
  space.
381

382
  This is a multi-node call.
383

384
  """
385
  c = Client("node_info", [vg_name])
386
  c.connect_list(node_list)
387
  c.run()
388
  retux = c.getresult()
389

    
390
  for node_name in retux:
391
    ret = retux.get(node_name, False)
392
    if type(ret) != dict:
393
      logger.Error("could not connect to node %s" % (node_name))
394
      ret = {}
395

    
396
    utils.CheckDict(ret,
397
                    { 'memory_total' : '-',
398
                      'memory_dom0' : '-',
399
                      'memory_free' : '-',
400
                      'vg_size' : 'node_unreachable',
401
                      'vg_free' : '-' },
402
                    "call_node_info",
403
                    )
404
  return retux
405

    
406

    
407
def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
408
  """Add a node to the cluster.
409

410
  This is a single-node call.
411

412
  """
413
  params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
414
  c = Client("node_add", params)
415
  c.connect(node)
416
  c.run()
417
  return c.getresult().get(node, False)
418

    
419

    
420
def call_node_verify(node_list, checkdict):
421
  """Request verification of given parameters.
422

423
  This is a multi-node call.
424

425
  """
426
  c = Client("node_verify", [checkdict])
427
  c.connect_list(node_list)
428
  c.run()
429
  return c.getresult()
430

    
431

    
432
def call_node_start_master(node):
433
  """Tells a node to activate itself as a master.
434

435
  This is a single-node call.
436

437
  """
438
  c = Client("node_start_master", [])
439
  c.connect(node)
440
  c.run()
441
  return c.getresult().get(node, False)
442

    
443

    
444
def call_node_stop_master(node):
445
  """Tells a node to demote itself from master status.
446

447
  This is a single-node call.
448

449
  """
450
  c = Client("node_stop_master", [])
451
  c.connect(node)
452
  c.run()
453
  return c.getresult().get(node, False)
454

    
455

    
456
def call_version(node_list):
457
  """Query node version.
458

459
  This is a multi-node call.
460

461
  """
462
  c = Client("version", [])
463
  c.connect_list(node_list)
464
  c.run()
465
  return c.getresult()
466

    
467

    
468
def call_blockdev_create(node, bdev, size, on_primary, info):
469
  """Request creation of a given block device.
470

471
  This is a single-node call.
472

473
  """
474
  params = [bdev.Dumps(), size, on_primary, info]
475
  c = Client("blockdev_create", params)
476
  c.connect(node)
477
  c.run()
478
  return c.getresult().get(node, False)
479

    
480

    
481
def call_blockdev_remove(node, bdev):
482
  """Request removal of a given block device.
483

484
  This is a single-node call.
485

486
  """
487
  c = Client("blockdev_remove", [bdev.Dumps()])
488
  c.connect(node)
489
  c.run()
490
  return c.getresult().get(node, False)
491

    
492

    
493
def call_blockdev_assemble(node, disk, on_primary):
494
  """Request assembling of a given block device.
495

496
  This is a single-node call.
497

498
  """
499
  params = [disk.Dumps(), on_primary]
500
  c = Client("blockdev_assemble", params)
501
  c.connect(node)
502
  c.run()
503
  return c.getresult().get(node, False)
504

    
505

    
506
def call_blockdev_shutdown(node, disk):
507
  """Request shutdown of a given block device.
508

509
  This is a single-node call.
510

511
  """
512
  c = Client("blockdev_shutdown", [disk.Dumps()])
513
  c.connect(node)
514
  c.run()
515
  return c.getresult().get(node, False)
516

    
517

    
518
def call_blockdev_addchild(node, bdev, ndev):
519
  """Request adding a new child to a (mirroring) device.
520

521
  This is a single-node call.
522

523
  """
524
  params = [bdev.Dumps(), ndev.Dumps()]
525
  c = Client("blockdev_addchild", params)
526
  c.connect(node)
527
  c.run()
528
  return c.getresult().get(node, False)
529

    
530

    
531
def call_blockdev_removechild(node, bdev, ndev):
532
  """Request removing a new child from a (mirroring) device.
533

534
  This is a single-node call.
535

536
  """
537
  params = [bdev.Dumps(), ndev.Dumps()]
538
  c = Client("blockdev_removechild", params)
539
  c.connect(node)
540
  c.run()
541
  return c.getresult().get(node, False)
542

    
543

    
544
def call_blockdev_getmirrorstatus(node, disks):
545
  """Request status of a (mirroring) device.
546

547
  This is a single-node call.
548

549
  """
550
  params = [dsk.Dumps() for dsk in disks]
551
  c = Client("blockdev_getmirrorstatus", params)
552
  c.connect(node)
553
  c.run()
554
  return c.getresult().get(node, False)
555

    
556

    
557
def call_blockdev_find(node, disk):
558
  """Request identification of a given block device.
559

560
  This is a single-node call.
561

562
  """
563
  c = Client("blockdev_find", [disk.Dumps()])
564
  c.connect(node)
565
  c.run()
566
  return c.getresult().get(node, False)
567

    
568

    
569
def call_upload_file(node_list, file_name):
570
  """Upload a file.
571

572
  The node will refuse the operation in case the file is not on the
573
  approved file list.
574

575
  This is a multi-node call.
576

577
  """
578
  fh = file(file_name)
579
  try:
580
    data = fh.read()
581
  finally:
582
    fh.close()
583
  st = os.stat(file_name)
584
  params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
585
            st.st_atime, st.st_mtime]
586
  c = Client("upload_file", params)
587
  c.connect_list(node_list)
588
  c.run()
589
  return c.getresult()
590

    
591

    
592
def call_os_diagnose(node_list):
593
  """Request a diagnose of OS definitions.
594

595
  This is a multi-node call.
596

597
  """
598
  c = Client("os_diagnose", [])
599
  c.connect_list(node_list)
600
  c.run()
601
  result = c.getresult()
602
  new_result = {}
603
  for node_name in result:
604
    nr = []
605
    if result[node_name]:
606
      for data in result[node_name]:
607
        if data:
608
          if isinstance(data, basestring):
609
            nr.append(objects.ConfigObject.Loads(data))
610
          elif isinstance(data, tuple) and len(data) == 2:
611
            nr.append(errors.InvalidOS(data[0], data[1]))
612
          else:
613
            raise errors.ProgrammerError("Invalid data from"
614
                                         " xcserver.os_diagnose")
615
    new_result[node_name] = nr
616
  return new_result
617

    
618

    
619
def call_os_get(node_list, name):
620
  """Returns an OS definition.
621

622
  This is a multi-node call.
623

624
  """
625
  c = Client("os_get", [name])
626
  c.connect_list(node_list)
627
  c.run()
628
  result = c.getresult()
629
  new_result = {}
630
  for node_name in result:
631
    data = result[node_name]
632
    if isinstance(data, basestring):
633
      new_result[node_name] = objects.ConfigObject.Loads(data)
634
    elif isinstance(data, tuple) and len(data) == 2:
635
      new_result[node_name] = errors.InvalidOS(data[0], data[1])
636
    else:
637
      new_result[node_name] = data
638
  return new_result
639

    
640

    
641
def call_hooks_runner(node_list, hpath, phase, env):
642
  """Call the hooks runner.
643

644
  Args:
645
    - op: the OpCode instance
646
    - env: a dictionary with the environment
647

648
  This is a multi-node call.
649

650
  """
651
  params = [hpath, phase, env]
652
  c = Client("hooks_runner", params)
653
  c.connect_list(node_list)
654
  c.run()
655
  result = c.getresult()
656
  return result
657

    
658

    
659
def call_blockdev_snapshot(node, cf_bdev):
660
  """Request a snapshot of the given block device.
661

662
  This is a single-node call.
663

664
  """
665
  c = Client("blockdev_snapshot", [cf_bdev.Dumps()])
666
  c.connect(node)
667
  c.run()
668
  return c.getresult().get(node, False)
669

    
670

    
671
def call_snapshot_export(node, snap_bdev, dest_node, instance):
672
  """Request the export of a given snapshot.
673

674
  This is a single-node call.
675

676
  """
677
  params = [snap_bdev.Dumps(), dest_node, instance.Dumps()]
678
  c = Client("snapshot_export", params)
679
  c.connect(node)
680
  c.run()
681
  return c.getresult().get(node, False)
682

    
683

    
684
def call_finalize_export(node, instance, snap_disks):
685
  """Request the completion of an export operation.
686

687
  This writes the export config file, etc.
688

689
  This is a single-node call.
690

691
  """
692
  flat_disks = []
693
  for disk in snap_disks:
694
    flat_disks.append(disk.Dumps())
695
  params = [instance.Dumps(), flat_disks]
696
  c = Client("finalize_export", params)
697
  c.connect(node)
698
  c.run()
699
  return c.getresult().get(node, False)
700

    
701

    
702
def call_export_info(node, path):
703
  """Queries the export information in a given path.
704

705
  This is a single-node call.
706

707
  """
708
  c = Client("export_info", [path])
709
  c.connect(node)
710
  c.run()
711
  result = c.getresult().get(node, False)
712
  if not result:
713
    return result
714
  return objects.SerializableConfigParser.Loads(result)
715

    
716

    
717
def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
718
  """Request the import of a backup into an instance.
719

720
  This is a single-node call.
721

722
  """
723
  params = [inst.Dumps(), osdev, swapdev, src_node, src_image]
724
  c = Client("instance_os_import", params)
725
  c.connect(node)
726
  c.run()
727
  return c.getresult().get(node, False)
728

    
729

    
730
def call_export_list(node_list):
731
  """Gets the stored exports list.
732

733
  This is a multi-node call.
734

735
  """
736
  c = Client("export_list", [])
737
  c.connect_list(node_list)
738
  c.run()
739
  result = c.getresult()
740
  return result
741

    
742

    
743
def call_export_remove(node, export):
744
  """Requests removal of a given export.
745

746
  This is a single-node call.
747

748
  """
749
  c = Client("export_remove", [export])
750
  c.connect(node)
751
  c.run()
752
  return c.getresult().get(node, False)
753

    
754

    
755
def call_node_leave_cluster(node):
756
  """Requests a node to clean the cluster information it has.
757

758
  This will remove the configuration information from the ganeti data
759
  dir.
760

761
  This is a single-node call.
762

763
  """
764
  c = Client("node_leave_cluster", [])
765
  c.connect(node)
766
  c.run()
767
  return c.getresult().get(node, False)
768

    
769

    
770
def call_node_volumes(node_list):
771
  """Gets all volumes on node(s).
772

773
  This is a multi-node call.
774

775
  """
776
  c = Client("node_volumes", [])
777
  c.connect_list(node_list)
778
  c.run()
779
  return c.getresult()