Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 2f8598a5

History | View | Annotate | Download (18.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script to show add a new node to the cluster
23

24
"""
25

    
26
# pylint: disable-msg=C0103
27

    
28
import os
29

    
30
from twisted.internet.pollreactor import PollReactor
31

    
32
class ReReactor(PollReactor):
33
  """A re-startable Reactor implementation.
34

35
  """
36
  def run(self, installSignalHandlers=1):
37
    """Custom run method.
38

39
    This is customized run that, before calling Reactor.run, will
40
    reinstall the shutdown events and re-create the threadpool in case
41
    these are not present (as will happen on the second run of the
42
    reactor).
43

44
    """
45
    if not 'shutdown' in self._eventTriggers:
46
      # the shutdown queue has been killed, we are most probably
47
      # at the second run, thus recreate the queue
48
      self.addSystemEventTrigger('during', 'shutdown', self.crash)
49
      self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
50
    if self.threadpool is not None and self.threadpool.joined == 1:
51
      # in case the threadpool has been stopped, re-start it
52
      # and add a trigger to stop it at reactor shutdown
53
      self.threadpool.start()
54
      self.addSystemEventTrigger('during', 'shutdown', self.threadpool.stop)
55

    
56
    return PollReactor.run(self, installSignalHandlers)
57

    
58

    
59
import twisted.internet.main
60
twisted.internet.main.installReactor(ReReactor())
61

    
62
from twisted.spread import pb
63
from twisted.internet import reactor
64
from twisted.cred import credentials
65
from OpenSSL import SSL, crypto
66

    
67
from ganeti import logger
68
from ganeti import utils
69
from ganeti import errors
70
from ganeti import constants
71
from ganeti import objects
72
from ganeti import ssconf
73

    
74
class NodeController:
75
  """Node-handling class.
76

77
  For each node that we speak with, we create an instance of this
78
  class, so that we have a safe place to store the details of this
79
  individual call.
80

81
  """
82
  def __init__(self, parent, node):
83
    self.parent = parent
84
    self.node = node
85

    
86
  def _check_end(self):
87
    """Stop the reactor if we got all the results.
88

89
    """
90
    if len(self.parent.results) == len(self.parent.nc):
91
      reactor.stop()
92

    
93
  def cb_call(self, obj):
94
    """Callback for successful connect.
95

96
    If the connect and login sequence succeeded, we proceed with
97
    making the actual call.
98

99
    """
100
    deferred = obj.callRemote(self.parent.procedure, self.parent.args)
101
    deferred.addCallbacks(self.cb_done, self.cb_err2)
102

    
103
  def cb_done(self, result):
104
    """Callback for successful call.
105

106
    When we receive the result from a call, we check if it was an
107
    error and if so we raise a generic RemoteError (we can't pass yet
108
    the actual exception over). If there was no error, we store the
109
    result.
110

111
    """
112
    tb, self.parent.results[self.node] = result
113
    self._check_end()
114
    if tb:
115
      raise errors.RemoteError("Remote procedure error calling %s on %s:"
116
                               "\n%s" % (self.parent.procedure,
117
                                         self.node,
118
                                         tb))
119

    
120
  def cb_err1(self, reason):
121
    """Error callback for unsuccessful connect.
122

123
    """
124
    logger.Error("caller_connect: could not connect to remote host %s,"
125
                 " reason %s" % (self.node, reason))
126
    self.parent.results[self.node] = False
127
    self._check_end()
128

    
129
  def cb_err2(self, reason):
130
    """Error callback for unsuccessful call.
131

132
    This is when the call didn't return anything, not even an error,
133
    or when it time out, etc.
134

135
    """
136
    logger.Error("caller_call: could not call %s on node %s,"
137
                 " reason %s" % (self.parent.procedure, self.node, reason))
138
    self.parent.results[self.node] = False
139
    self._check_end()
140

    
141

    
142
class MirrorContextFactory:
143
  """Certificate verifier factory.
144

145
  This factory creates contexts that verify if the remote end has a
146
  specific certificate (i.e. our own certificate).
147

148
  The checks we do are that the PEM dump of the certificate is the
149
  same as our own and (somewhat redundantly) that the SHA checksum is
150
  the same.
151

152
  """
153
  isClient = 1
154

    
155
  def __init__(self):
156
    try:
157
      fd = open(constants.SSL_CERT_FILE, 'r')
158
      try:
159
        data = fd.read(16384)
160
      finally:
161
        fd.close()
162
    except EnvironmentError, err:
163
      raise errors.ConfigurationError("missing SSL certificate: %s" %
164
                                      str(err))
165
    self.mycert = crypto.load_certificate(crypto.FILETYPE_PEM, data)
166
    self.mypem = crypto.dump_certificate(crypto.FILETYPE_PEM, self.mycert)
167
    self.mydigest = self.mycert.digest('SHA')
168

    
169
  def verifier(self, conn, x509, errno, err_depth, retcode):
170
    """Certificate verify method.
171

172
    """
173
    if self.mydigest != x509.digest('SHA'):
174
      return False
175
    if crypto.dump_certificate(crypto.FILETYPE_PEM, x509) != self.mypem:
176
      return False
177
    return True
178

    
179
  def getContext(self):
180
    """Context generator.
181

182
    """
183
    context = SSL.Context(SSL.TLSv1_METHOD)
184
    context.set_verify(SSL.VERIFY_PEER, self.verifier)
185
    return context
186

    
187
class Client:
188
  """RPC Client class.
189

190
  This class, given a (remote) method name, a list of parameters and a
191
  list of nodes, will contact (in parallel) all nodes, and return a
192
  dict of results (key: node name, value: result).
193

194
  One current bug is that generic failure is still signalled by
195
  'False' result, which is not good. This overloading of values can
196
  cause bugs.
197

198
  """
199
  result_set = False
200
  result = False
201
  allresult = []
202

    
203
  def __init__(self, procedure, args):
204
    ss = ssconf.SimpleStore()
205
    self.port = ss.GetNodeDaemonPort()
206
    self.nodepw = ss.GetNodeDaemonPassword()
207
    self.nc = {}
208
    self.results = {}
209
    self.procedure = procedure
210
    self.args = args
211

    
212
  #--- generic connector -------------
213

    
214
  def connect_list(self, node_list):
215
    """Add a list of nodes to the target nodes.
216

217
    """
218
    for node in node_list:
219
      self.connect(node)
220

    
221
  def connect(self, connect_node):
222
    """Add a node to the target list.
223

224
    """
225
    factory = pb.PBClientFactory()
226
    self.nc[connect_node] = nc = NodeController(self, connect_node)
227
    reactor.connectSSL(connect_node, self.port, factory,
228
                       MirrorContextFactory())
229
    #d = factory.getRootObject()
230
    d = factory.login(credentials.UsernamePassword("master_node", self.nodepw))
231
    d.addCallbacks(nc.cb_call, nc.cb_err1)
232

    
233
  def getresult(self):
234
    """Return the results of the call.
235

236
    """
237
    return self.results
238

    
239
  def run(self):
240
    """Wrapper over reactor.run().
241

242
    This function simply calls reactor.run() if we have any requests
243
    queued, otherwise it does nothing.
244

245
    """
246
    if self.nc:
247
      reactor.run()
248

    
249

    
250
def call_volume_list(node_list, vg_name):
251
  """Gets the logical volumes present in a given volume group.
252

253
  This is a multi-node call.
254

255
  """
256
  c = Client("volume_list", [vg_name])
257
  c.connect_list(node_list)
258
  c.run()
259
  return c.getresult()
260

    
261

    
262
def call_vg_list(node_list):
263
  """Gets the volume group list.
264

265
  This is a multi-node call.
266

267
  """
268
  c = Client("vg_list", [])
269
  c.connect_list(node_list)
270
  c.run()
271
  return c.getresult()
272

    
273

    
274
def call_bridges_exist(node, bridges_list):
275
  """Checks if a node has all the bridges given.
276

277
  This method checks if all bridges given in the bridges_list are
278
  present on the remote node, so that an instance that uses interfaces
279
  on those bridges can be started.
280

281
  This is a single-node call.
282

283
  """
284
  c = Client("bridges_exist", [bridges_list])
285
  c.connect(node)
286
  c.run()
287
  return c.getresult().get(node, False)
288

    
289

    
290
def call_instance_start(node, instance, extra_args):
291
  """Starts an instance.
292

293
  This is a single-node call.
294

295
  """
296
  c = Client("instance_start", [instance.ToDict(), extra_args])
297
  c.connect(node)
298
  c.run()
299
  return c.getresult().get(node, False)
300

    
301

    
302
def call_instance_shutdown(node, instance):
303
  """Stops an instance.
304

305
  This is a single-node call.
306

307
  """
308
  c = Client("instance_shutdown", [instance.ToDict()])
309
  c.connect(node)
310
  c.run()
311
  return c.getresult().get(node, False)
312

    
313

    
314
def call_instance_os_add(node, inst, osdev, swapdev):
315
  """Installs an OS on the given instance.
316

317
  This is a single-node call.
318

319
  """
320
  params = [inst.ToDict(), osdev, swapdev]
321
  c = Client("instance_os_add", params)
322
  c.connect(node)
323
  c.run()
324
  return c.getresult().get(node, False)
325

    
326

    
327
def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
328
  """Run the OS rename script for an instance.
329

330
  This is a single-node call.
331

332
  """
333
  params = [inst.ToDict(), old_name, osdev, swapdev]
334
  c = Client("instance_run_rename", params)
335
  c.connect(node)
336
  c.run()
337
  return c.getresult().get(node, False)
338

    
339

    
340
def call_instance_info(node, instance):
341
  """Returns information about a single instance.
342

343
  This is a single-node call.
344

345
  """
346
  c = Client("instance_info", [instance])
347
  c.connect(node)
348
  c.run()
349
  return c.getresult().get(node, False)
350

    
351

    
352
def call_all_instances_info(node_list):
353
  """Returns information about all instances on a given node.
354

355
  This is a single-node call.
356

357
  """
358
  c = Client("all_instances_info", [])
359
  c.connect_list(node_list)
360
  c.run()
361
  return c.getresult()
362

    
363

    
364
def call_instance_list(node_list):
365
  """Returns the list of running instances on a given node.
366

367
  This is a single-node call.
368

369
  """
370
  c = Client("instance_list", [])
371
  c.connect_list(node_list)
372
  c.run()
373
  return c.getresult()
374

    
375

    
376
def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
377
  """Do a TcpPing on the remote node
378

379
  This is a single-node call.
380
  """
381
  c = Client("node_tcp_ping", [source, target, port, timeout,
382
                               live_port_needed])
383
  c.connect(node)
384
  c.run()
385
  return c.getresult().get(node, False)
386

    
387

    
388
def call_node_info(node_list, vg_name):
389
  """Return node information.
390

391
  This will return memory information and volume group size and free
392
  space.
393

394
  This is a multi-node call.
395

396
  """
397
  c = Client("node_info", [vg_name])
398
  c.connect_list(node_list)
399
  c.run()
400
  retux = c.getresult()
401

    
402
  for node_name in retux:
403
    ret = retux.get(node_name, False)
404
    if type(ret) != dict:
405
      logger.Error("could not connect to node %s" % (node_name))
406
      ret = {}
407

    
408
    utils.CheckDict(ret,
409
                    { 'memory_total' : '-',
410
                      'memory_dom0' : '-',
411
                      'memory_free' : '-',
412
                      'vg_size' : 'node_unreachable',
413
                      'vg_free' : '-' },
414
                    "call_node_info",
415
                    )
416
  return retux
417

    
418

    
419
def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
420
  """Add a node to the cluster.
421

422
  This is a single-node call.
423

424
  """
425
  params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
426
  c = Client("node_add", params)
427
  c.connect(node)
428
  c.run()
429
  return c.getresult().get(node, False)
430

    
431

    
432
def call_node_verify(node_list, checkdict):
433
  """Request verification of given parameters.
434

435
  This is a multi-node call.
436

437
  """
438
  c = Client("node_verify", [checkdict])
439
  c.connect_list(node_list)
440
  c.run()
441
  return c.getresult()
442

    
443

    
444
def call_node_start_master(node):
445
  """Tells a node to activate itself as a master.
446

447
  This is a single-node call.
448

449
  """
450
  c = Client("node_start_master", [])
451
  c.connect(node)
452
  c.run()
453
  return c.getresult().get(node, False)
454

    
455

    
456
def call_node_stop_master(node):
457
  """Tells a node to demote itself from master status.
458

459
  This is a single-node call.
460

461
  """
462
  c = Client("node_stop_master", [])
463
  c.connect(node)
464
  c.run()
465
  return c.getresult().get(node, False)
466

    
467

    
468
def call_version(node_list):
469
  """Query node version.
470

471
  This is a multi-node call.
472

473
  """
474
  c = Client("version", [])
475
  c.connect_list(node_list)
476
  c.run()
477
  return c.getresult()
478

    
479

    
480
def call_blockdev_create(node, bdev, size, on_primary, info):
481
  """Request creation of a given block device.
482

483
  This is a single-node call.
484

485
  """
486
  params = [bdev.ToDict(), size, on_primary, info]
487
  c = Client("blockdev_create", params)
488
  c.connect(node)
489
  c.run()
490
  return c.getresult().get(node, False)
491

    
492

    
493
def call_blockdev_remove(node, bdev):
494
  """Request removal of a given block device.
495

496
  This is a single-node call.
497

498
  """
499
  c = Client("blockdev_remove", [bdev.ToDict()])
500
  c.connect(node)
501
  c.run()
502
  return c.getresult().get(node, False)
503

    
504

    
505
def call_blockdev_assemble(node, disk, on_primary):
506
  """Request assembling of a given block device.
507

508
  This is a single-node call.
509

510
  """
511
  params = [disk.ToDict(), on_primary]
512
  c = Client("blockdev_assemble", params)
513
  c.connect(node)
514
  c.run()
515
  return c.getresult().get(node, False)
516

    
517

    
518
def call_blockdev_shutdown(node, disk):
519
  """Request shutdown of a given block device.
520

521
  This is a single-node call.
522

523
  """
524
  c = Client("blockdev_shutdown", [disk.ToDict()])
525
  c.connect(node)
526
  c.run()
527
  return c.getresult().get(node, False)
528

    
529

    
530
def call_blockdev_addchild(node, bdev, ndev):
531
  """Request adding a new child to a (mirroring) device.
532

533
  This is a single-node call.
534

535
  """
536
  params = [bdev.ToDict(), ndev.ToDict()]
537
  c = Client("blockdev_addchild", params)
538
  c.connect(node)
539
  c.run()
540
  return c.getresult().get(node, False)
541

    
542

    
543
def call_blockdev_removechild(node, bdev, ndev):
544
  """Request removing a new child from a (mirroring) device.
545

546
  This is a single-node call.
547

548
  """
549
  params = [bdev.ToDict(), ndev.ToDict()]
550
  c = Client("blockdev_removechild", params)
551
  c.connect(node)
552
  c.run()
553
  return c.getresult().get(node, False)
554

    
555

    
556
def call_blockdev_getmirrorstatus(node, disks):
557
  """Request status of a (mirroring) device.
558

559
  This is a single-node call.
560

561
  """
562
  params = [dsk.ToDict() for dsk in disks]
563
  c = Client("blockdev_getmirrorstatus", params)
564
  c.connect(node)
565
  c.run()
566
  return c.getresult().get(node, False)
567

    
568

    
569
def call_blockdev_find(node, disk):
570
  """Request identification of a given block device.
571

572
  This is a single-node call.
573

574
  """
575
  c = Client("blockdev_find", [disk.ToDict()])
576
  c.connect(node)
577
  c.run()
578
  return c.getresult().get(node, False)
579

    
580

    
581
def call_upload_file(node_list, file_name):
582
  """Upload a file.
583

584
  The node will refuse the operation in case the file is not on the
585
  approved file list.
586

587
  This is a multi-node call.
588

589
  """
590
  fh = file(file_name)
591
  try:
592
    data = fh.read()
593
  finally:
594
    fh.close()
595
  st = os.stat(file_name)
596
  params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
597
            st.st_atime, st.st_mtime]
598
  c = Client("upload_file", params)
599
  c.connect_list(node_list)
600
  c.run()
601
  return c.getresult()
602

    
603

    
604
def call_os_diagnose(node_list):
605
  """Request a diagnose of OS definitions.
606

607
  This is a multi-node call.
608

609
  """
610
  c = Client("os_diagnose", [])
611
  c.connect_list(node_list)
612
  c.run()
613
  result = c.getresult()
614
  new_result = {}
615
  for node_name in result:
616
    nr = []
617
    if result[node_name]:
618
      for data in result[node_name]:
619
        if data:
620
          if isinstance(data, dict):
621
            nr.append(objects.OS.FromDict(data))
622
          elif isinstance(data, tuple) and len(data) == 3:
623
            nr.append(errors.InvalidOS(data[0], data[1], data[2]))
624
          else:
625
            raise errors.ProgrammerError("Invalid data from"
626
                                         " xcserver.os_diagnose")
627
    new_result[node_name] = nr
628
  return new_result
629

    
630

    
631
def call_os_get(node_list, name):
632
  """Returns an OS definition.
633

634
  This is a multi-node call.
635

636
  """
637
  c = Client("os_get", [name])
638
  c.connect_list(node_list)
639
  c.run()
640
  result = c.getresult()
641
  new_result = {}
642
  for node_name in result:
643
    data = result[node_name]
644
    if isinstance(data, dict):
645
      new_result[node_name] = objects.OS.FromDict(data)
646
    elif isinstance(data, tuple) and len(data) == 3:
647
      new_result[node_name] = errors.InvalidOS(data[0], data[1], data[2])
648
    else:
649
      new_result[node_name] = data
650
  return new_result
651

    
652

    
653
def call_hooks_runner(node_list, hpath, phase, env):
654
  """Call the hooks runner.
655

656
  Args:
657
    - op: the OpCode instance
658
    - env: a dictionary with the environment
659

660
  This is a multi-node call.
661

662
  """
663
  params = [hpath, phase, env]
664
  c = Client("hooks_runner", params)
665
  c.connect_list(node_list)
666
  c.run()
667
  result = c.getresult()
668
  return result
669

    
670

    
671
def call_blockdev_snapshot(node, cf_bdev):
672
  """Request a snapshot of the given block device.
673

674
  This is a single-node call.
675

676
  """
677
  c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
678
  c.connect(node)
679
  c.run()
680
  return c.getresult().get(node, False)
681

    
682

    
683
def call_snapshot_export(node, snap_bdev, dest_node, instance):
684
  """Request the export of a given snapshot.
685

686
  This is a single-node call.
687

688
  """
689
  params = [snap_bdev.ToDict(), dest_node, instance.ToDict()]
690
  c = Client("snapshot_export", params)
691
  c.connect(node)
692
  c.run()
693
  return c.getresult().get(node, False)
694

    
695

    
696
def call_finalize_export(node, instance, snap_disks):
697
  """Request the completion of an export operation.
698

699
  This writes the export config file, etc.
700

701
  This is a single-node call.
702

703
  """
704
  flat_disks = []
705
  for disk in snap_disks:
706
    flat_disks.append(disk.ToDict())
707
  params = [instance.ToDict(), flat_disks]
708
  c = Client("finalize_export", params)
709
  c.connect(node)
710
  c.run()
711
  return c.getresult().get(node, False)
712

    
713

    
714
def call_export_info(node, path):
715
  """Queries the export information in a given path.
716

717
  This is a single-node call.
718

719
  """
720
  c = Client("export_info", [path])
721
  c.connect(node)
722
  c.run()
723
  result = c.getresult().get(node, False)
724
  if not result:
725
    return result
726
  return objects.SerializableConfigParser.Loads(result)
727

    
728

    
729
def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
730
  """Request the import of a backup into an instance.
731

732
  This is a single-node call.
733

734
  """
735
  params = [inst.ToDict(), osdev, swapdev, src_node, src_image]
736
  c = Client("instance_os_import", params)
737
  c.connect(node)
738
  c.run()
739
  return c.getresult().get(node, False)
740

    
741

    
742
def call_export_list(node_list):
743
  """Gets the stored exports list.
744

745
  This is a multi-node call.
746

747
  """
748
  c = Client("export_list", [])
749
  c.connect_list(node_list)
750
  c.run()
751
  result = c.getresult()
752
  return result
753

    
754

    
755
def call_export_remove(node, export):
756
  """Requests removal of a given export.
757

758
  This is a single-node call.
759

760
  """
761
  c = Client("export_remove", [export])
762
  c.connect(node)
763
  c.run()
764
  return c.getresult().get(node, False)
765

    
766

    
767
def call_node_leave_cluster(node):
768
  """Requests a node to clean the cluster information it has.
769

770
  This will remove the configuration information from the ganeti data
771
  dir.
772

773
  This is a single-node call.
774

775
  """
776
  c = Client("node_leave_cluster", [])
777
  c.connect(node)
778
  c.run()
779
  return c.getresult().get(node, False)
780

    
781

    
782
def call_node_volumes(node_list):
783
  """Gets all volumes on node(s).
784

785
  This is a multi-node call.
786

787
  """
788
  c = Client("node_volumes", [])
789
  c.connect_list(node_list)
790
  c.run()
791
  return c.getresult()