Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 9f0e6b37

History | View | Annotate | Download (18.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script to show add a new node to the cluster
23

24
"""
25

    
26
# pylint: disable-msg=C0103
27

    
28
import os
29
import socket
30
import httplib
31

    
32
import simplejson
33

    
34
from ganeti import logger
35
from ganeti import utils
36
from ganeti import objects
37

    
38

    
39
class NodeController:
40
  """Node-handling class.
41

42
  For each node that we speak with, we create an instance of this
43
  class, so that we have a safe place to store the details of this
44
  individual call.
45

46
  """
47
  def __init__(self, parent, node):
48
    self.parent = parent
49
    self.node = node
50
    self.failed = False
51

    
52
    self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
53
    try:
54
      hc.connect()
55
      hc.putrequest('PUT', "/%s" % self.parent.procedure,
56
                    skip_accept_encoding=True)
57
      hc.putheader('Content-Length', str(len(parent.body)))
58
      hc.endheaders()
59
      hc.send(parent.body)
60
    except socket.error, err:
61
      logger.Error("Error connecting to %s: %s" % (node, str(err)))
62
      self.failed = True
63

    
64
  def get_response(self):
65
    """Try to process the response from the node.
66

67
    """
68
    if self.failed:
69
      # we already failed in connect
70
      return False
71
    resp = self.http_conn.getresponse()
72
    if resp.status != 200:
73
      return False
74
    try:
75
      length = int(resp.getheader('Content-Length', '0'))
76
    except ValueError:
77
      return False
78
    if not length:
79
      logger.Error("Zero-length reply from %s" % self.node)
80
      return False
81
    payload = resp.read(length)
82
    unload = simplejson.loads(payload)
83
    return unload
84

    
85

    
86
class Client:
87
  """RPC Client class.
88

89
  This class, given a (remote) method name, a list of parameters and a
90
  list of nodes, will contact (in parallel) all nodes, and return a
91
  dict of results (key: node name, value: result).
92

93
  One current bug is that generic failure is still signalled by
94
  'False' result, which is not good. This overloading of values can
95
  cause bugs.
96

97
  """
98
  result_set = False
99
  result = False
100
  allresult = []
101

    
102
  def __init__(self, procedure, args):
103
    self.port = utils.GetNodeDaemonPort()
104
    self.nodepw = utils.GetNodeDaemonPassword()
105
    self.nc = {}
106
    self.results = {}
107
    self.procedure = procedure
108
    self.args = args
109
    self.body = simplejson.dumps(args)
110

    
111
  #--- generic connector -------------
112

    
113
  def connect_list(self, node_list):
114
    """Add a list of nodes to the target nodes.
115

116
    """
117
    for node in node_list:
118
      self.connect(node)
119

    
120
  def connect(self, connect_node):
121
    """Add a node to the target list.
122

123
    """
124
    self.nc[connect_node] = nc = NodeController(self, connect_node)
125

    
126
  def getresult(self):
127
    """Return the results of the call.
128

129
    """
130
    return self.results
131

    
132
  def run(self):
133
    """Wrapper over reactor.run().
134

135
    This function simply calls reactor.run() if we have any requests
136
    queued, otherwise it does nothing.
137

138
    """
139
    for node, nc in self.nc.items():
140
      self.results[node] = nc.get_response()
141

    
142

    
143
def call_volume_list(node_list, vg_name):
144
  """Gets the logical volumes present in a given volume group.
145

146
  This is a multi-node call.
147

148
  """
149
  c = Client("volume_list", [vg_name])
150
  c.connect_list(node_list)
151
  c.run()
152
  return c.getresult()
153

    
154

    
155
def call_vg_list(node_list):
156
  """Gets the volume group list.
157

158
  This is a multi-node call.
159

160
  """
161
  c = Client("vg_list", [])
162
  c.connect_list(node_list)
163
  c.run()
164
  return c.getresult()
165

    
166

    
167
def call_bridges_exist(node, bridges_list):
168
  """Checks if a node has all the bridges given.
169

170
  This method checks if all bridges given in the bridges_list are
171
  present on the remote node, so that an instance that uses interfaces
172
  on those bridges can be started.
173

174
  This is a single-node call.
175

176
  """
177
  c = Client("bridges_exist", [bridges_list])
178
  c.connect(node)
179
  c.run()
180
  return c.getresult().get(node, False)
181

    
182

    
183
def call_instance_start(node, instance, extra_args):
184
  """Starts an instance.
185

186
  This is a single-node call.
187

188
  """
189
  c = Client("instance_start", [instance.ToDict(), extra_args])
190
  c.connect(node)
191
  c.run()
192
  return c.getresult().get(node, False)
193

    
194

    
195
def call_instance_shutdown(node, instance):
196
  """Stops an instance.
197

198
  This is a single-node call.
199

200
  """
201
  c = Client("instance_shutdown", [instance.ToDict()])
202
  c.connect(node)
203
  c.run()
204
  return c.getresult().get(node, False)
205

    
206

    
207
def call_instance_migrate(node, instance, target, live):
208
  """Migrate an instance.
209

210
  This is a single-node call.
211

212
  @type node: string
213
  @param node: the node on which the instance is currently running
214
  @type instance: instance object
215
  @param instance: the instance definition
216
  @type target: string
217
  @param target: the target node name
218
  @type live: boolean
219
  @param live: whether the migration should be done live or not (the
220
      interpretation of this parameter is left to the hypervisor)
221

222
  """
223
  c = Client("instance_migrate", [instance.ToDict(), target, live])
224
  c.connect(node)
225
  c.run()
226
  return c.getresult().get(node, False)
227

    
228

    
229
def call_instance_reboot(node, instance, reboot_type, extra_args):
230
  """Reboots an instance.
231

232
  This is a single-node call.
233

234
  """
235
  c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
236
  c.connect(node)
237
  c.run()
238
  return c.getresult().get(node, False)
239

    
240

    
241
def call_instance_os_add(node, inst, osdev, swapdev):
242
  """Installs an OS on the given instance.
243

244
  This is a single-node call.
245

246
  """
247
  params = [inst.ToDict(), osdev, swapdev]
248
  c = Client("instance_os_add", params)
249
  c.connect(node)
250
  c.run()
251
  return c.getresult().get(node, False)
252

    
253

    
254
def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
255
  """Run the OS rename script for an instance.
256

257
  This is a single-node call.
258

259
  """
260
  params = [inst.ToDict(), old_name, osdev, swapdev]
261
  c = Client("instance_run_rename", params)
262
  c.connect(node)
263
  c.run()
264
  return c.getresult().get(node, False)
265

    
266

    
267
def call_instance_info(node, instance):
268
  """Returns information about a single instance.
269

270
  This is a single-node call.
271

272
  """
273
  c = Client("instance_info", [instance])
274
  c.connect(node)
275
  c.run()
276
  return c.getresult().get(node, False)
277

    
278

    
279
def call_all_instances_info(node_list):
280
  """Returns information about all instances on a given node.
281

282
  This is a single-node call.
283

284
  """
285
  c = Client("all_instances_info", [])
286
  c.connect_list(node_list)
287
  c.run()
288
  return c.getresult()
289

    
290

    
291
def call_instance_list(node_list):
292
  """Returns the list of running instances on a given node.
293

294
  This is a single-node call.
295

296
  """
297
  c = Client("instance_list", [])
298
  c.connect_list(node_list)
299
  c.run()
300
  return c.getresult()
301

    
302

    
303
def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
304
  """Do a TcpPing on the remote node
305

306
  This is a single-node call.
307
  """
308
  c = Client("node_tcp_ping", [source, target, port, timeout,
309
                               live_port_needed])
310
  c.connect(node)
311
  c.run()
312
  return c.getresult().get(node, False)
313

    
314

    
315
def call_node_info(node_list, vg_name):
316
  """Return node information.
317

318
  This will return memory information and volume group size and free
319
  space.
320

321
  This is a multi-node call.
322

323
  """
324
  c = Client("node_info", [vg_name])
325
  c.connect_list(node_list)
326
  c.run()
327
  retux = c.getresult()
328

    
329
  for node_name in retux:
330
    ret = retux.get(node_name, False)
331
    if type(ret) != dict:
332
      logger.Error("could not connect to node %s" % (node_name))
333
      ret = {}
334

    
335
    utils.CheckDict(ret,
336
                    { 'memory_total' : '-',
337
                      'memory_dom0' : '-',
338
                      'memory_free' : '-',
339
                      'vg_size' : 'node_unreachable',
340
                      'vg_free' : '-' },
341
                    "call_node_info",
342
                    )
343
  return retux
344

    
345

    
346
def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
347
  """Add a node to the cluster.
348

349
  This is a single-node call.
350

351
  """
352
  params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
353
  c = Client("node_add", params)
354
  c.connect(node)
355
  c.run()
356
  return c.getresult().get(node, False)
357

    
358

    
359
def call_node_verify(node_list, checkdict, cluster_name):
360
  """Request verification of given parameters.
361

362
  This is a multi-node call.
363

364
  """
365
  c = Client("node_verify", [checkdict, cluster_name])
366
  c.connect_list(node_list)
367
  c.run()
368
  return c.getresult()
369

    
370

    
371
def call_node_start_master(node, start_daemons):
372
  """Tells a node to activate itself as a master.
373

374
  This is a single-node call.
375

376
  """
377
  c = Client("node_start_master", [start_daemons])
378
  c.connect(node)
379
  c.run()
380
  return c.getresult().get(node, False)
381

    
382

    
383
def call_node_stop_master(node, stop_daemons):
384
  """Tells a node to demote itself from master status.
385

386
  This is a single-node call.
387

388
  """
389
  c = Client("node_stop_master", [stop_daemons])
390
  c.connect(node)
391
  c.run()
392
  return c.getresult().get(node, False)
393

    
394

    
395
def call_master_info(node_list):
396
  """Query master info.
397

398
  This is a multi-node call.
399

400
  """
401
  c = Client("master_info", [])
402
  c.connect_list(node_list)
403
  c.run()
404
  return c.getresult()
405

    
406

    
407
def call_version(node_list):
408
  """Query node version.
409

410
  This is a multi-node call.
411

412
  """
413
  c = Client("version", [])
414
  c.connect_list(node_list)
415
  c.run()
416
  return c.getresult()
417

    
418

    
419
def call_blockdev_create(node, bdev, size, owner, on_primary, info):
420
  """Request creation of a given block device.
421

422
  This is a single-node call.
423

424
  """
425
  params = [bdev.ToDict(), size, owner, on_primary, info]
426
  c = Client("blockdev_create", params)
427
  c.connect(node)
428
  c.run()
429
  return c.getresult().get(node, False)
430

    
431

    
432
def call_blockdev_remove(node, bdev):
433
  """Request removal of a given block device.
434

435
  This is a single-node call.
436

437
  """
438
  c = Client("blockdev_remove", [bdev.ToDict()])
439
  c.connect(node)
440
  c.run()
441
  return c.getresult().get(node, False)
442

    
443

    
444
def call_blockdev_rename(node, devlist):
445
  """Request rename of the given block devices.
446

447
  This is a single-node call.
448

449
  """
450
  params = [(d.ToDict(), uid) for d, uid in devlist]
451
  c = Client("blockdev_rename", params)
452
  c.connect(node)
453
  c.run()
454
  return c.getresult().get(node, False)
455

    
456

    
457
def call_blockdev_assemble(node, disk, owner, on_primary):
458
  """Request assembling of a given block device.
459

460
  This is a single-node call.
461

462
  """
463
  params = [disk.ToDict(), owner, on_primary]
464
  c = Client("blockdev_assemble", params)
465
  c.connect(node)
466
  c.run()
467
  return c.getresult().get(node, False)
468

    
469

    
470
def call_blockdev_shutdown(node, disk):
471
  """Request shutdown of a given block device.
472

473
  This is a single-node call.
474

475
  """
476
  c = Client("blockdev_shutdown", [disk.ToDict()])
477
  c.connect(node)
478
  c.run()
479
  return c.getresult().get(node, False)
480

    
481

    
482
def call_blockdev_addchildren(node, bdev, ndevs):
483
  """Request adding a list of children to a (mirroring) device.
484

485
  This is a single-node call.
486

487
  """
488
  params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
489
  c = Client("blockdev_addchildren", params)
490
  c.connect(node)
491
  c.run()
492
  return c.getresult().get(node, False)
493

    
494

    
495
def call_blockdev_removechildren(node, bdev, ndevs):
496
  """Request removing a list of children from a (mirroring) device.
497

498
  This is a single-node call.
499

500
  """
501
  params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
502
  c = Client("blockdev_removechildren", params)
503
  c.connect(node)
504
  c.run()
505
  return c.getresult().get(node, False)
506

    
507

    
508
def call_blockdev_getmirrorstatus(node, disks):
509
  """Request status of a (mirroring) device.
510

511
  This is a single-node call.
512

513
  """
514
  params = [dsk.ToDict() for dsk in disks]
515
  c = Client("blockdev_getmirrorstatus", params)
516
  c.connect(node)
517
  c.run()
518
  return c.getresult().get(node, False)
519

    
520

    
521
def call_blockdev_find(node, disk):
522
  """Request identification of a given block device.
523

524
  This is a single-node call.
525

526
  """
527
  c = Client("blockdev_find", [disk.ToDict()])
528
  c.connect(node)
529
  c.run()
530
  return c.getresult().get(node, False)
531

    
532

    
533
def call_blockdev_close(node, disks):
534
  """Closes the given block devices.
535

536
  This is a single-node call.
537

538
  """
539
  params = [cf.ToDict() for cf in disks]
540
  c = Client("blockdev_close", params)
541
  c.connect(node)
542
  c.run()
543
  return c.getresult().get(node, False)
544

    
545

    
546
def call_upload_file(node_list, file_name):
547
  """Upload a file.
548

549
  The node will refuse the operation in case the file is not on the
550
  approved file list.
551

552
  This is a multi-node call.
553

554
  """
555
  fh = file(file_name)
556
  try:
557
    data = fh.read()
558
  finally:
559
    fh.close()
560
  st = os.stat(file_name)
561
  params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
562
            st.st_atime, st.st_mtime]
563
  c = Client("upload_file", params)
564
  c.connect_list(node_list)
565
  c.run()
566
  return c.getresult()
567

    
568

    
569
def call_os_diagnose(node_list):
570
  """Request a diagnose of OS definitions.
571

572
  This is a multi-node call.
573

574
  """
575
  c = Client("os_diagnose", [])
576
  c.connect_list(node_list)
577
  c.run()
578
  result = c.getresult()
579
  new_result = {}
580
  for node_name in result:
581
    if result[node_name]:
582
      nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
583
    else:
584
      nr = []
585
    new_result[node_name] = nr
586
  return new_result
587

    
588

    
589
def call_os_get(node, name):
590
  """Returns an OS definition.
591

592
  This is a single-node call.
593

594
  """
595
  c = Client("os_get", [name])
596
  c.connect(node)
597
  c.run()
598
  result = c.getresult().get(node, False)
599
  if isinstance(result, dict):
600
    return objects.OS.FromDict(result)
601
  else:
602
    return result
603

    
604

    
605
def call_hooks_runner(node_list, hpath, phase, env):
606
  """Call the hooks runner.
607

608
  Args:
609
    - op: the OpCode instance
610
    - env: a dictionary with the environment
611

612
  This is a multi-node call.
613

614
  """
615
  params = [hpath, phase, env]
616
  c = Client("hooks_runner", params)
617
  c.connect_list(node_list)
618
  c.run()
619
  result = c.getresult()
620
  return result
621

    
622

    
623
def call_iallocator_runner(node, name, idata):
624
  """Call an iallocator on a remote node
625

626
  Args:
627
    - name: the iallocator name
628
    - input: the json-encoded input string
629

630
  This is a single-node call.
631

632
  """
633
  params = [name, idata]
634
  c = Client("iallocator_runner", params)
635
  c.connect(node)
636
  c.run()
637
  result = c.getresult().get(node, False)
638
  return result
639

    
640

    
641
def call_blockdev_grow(node, cf_bdev, amount):
642
  """Request a snapshot of the given block device.
643

644
  This is a single-node call.
645

646
  """
647
  c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
648
  c.connect(node)
649
  c.run()
650
  return c.getresult().get(node, False)
651

    
652

    
653
def call_blockdev_snapshot(node, cf_bdev):
654
  """Request a snapshot of the given block device.
655

656
  This is a single-node call.
657

658
  """
659
  c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
660
  c.connect(node)
661
  c.run()
662
  return c.getresult().get(node, False)
663

    
664

    
665
def call_snapshot_export(node, snap_bdev, dest_node, instance, cluster_name):
666
  """Request the export of a given snapshot.
667

668
  This is a single-node call.
669

670
  """
671
  params = [snap_bdev.ToDict(), dest_node, instance.ToDict(), cluster_name]
672
  c = Client("snapshot_export", params)
673
  c.connect(node)
674
  c.run()
675
  return c.getresult().get(node, False)
676

    
677

    
678
def call_finalize_export(node, instance, snap_disks):
679
  """Request the completion of an export operation.
680

681
  This writes the export config file, etc.
682

683
  This is a single-node call.
684

685
  """
686
  flat_disks = []
687
  for disk in snap_disks:
688
    flat_disks.append(disk.ToDict())
689
  params = [instance.ToDict(), flat_disks]
690
  c = Client("finalize_export", params)
691
  c.connect(node)
692
  c.run()
693
  return c.getresult().get(node, False)
694

    
695

    
696
def call_export_info(node, path):
697
  """Queries the export information in a given path.
698

699
  This is a single-node call.
700

701
  """
702
  c = Client("export_info", [path])
703
  c.connect(node)
704
  c.run()
705
  result = c.getresult().get(node, False)
706
  if not result:
707
    return result
708
  return objects.SerializableConfigParser.Loads(str(result))
709

    
710

    
711
def call_instance_os_import(node, inst, osdev, swapdev,
712
                            src_node, src_image, cluster_name):
713
  """Request the import of a backup into an instance.
714

715
  This is a single-node call.
716

717
  """
718
  params = [inst.ToDict(), osdev, swapdev, src_node, src_image, cluster_name]
719
  c = Client("instance_os_import", params)
720
  c.connect(node)
721
  c.run()
722
  return c.getresult().get(node, False)
723

    
724

    
725
def call_export_list(node_list):
726
  """Gets the stored exports list.
727

728
  This is a multi-node call.
729

730
  """
731
  c = Client("export_list", [])
732
  c.connect_list(node_list)
733
  c.run()
734
  result = c.getresult()
735
  return result
736

    
737

    
738
def call_export_remove(node, export):
739
  """Requests removal of a given export.
740

741
  This is a single-node call.
742

743
  """
744
  c = Client("export_remove", [export])
745
  c.connect(node)
746
  c.run()
747
  return c.getresult().get(node, False)
748

    
749

    
750
def call_node_leave_cluster(node):
751
  """Requests a node to clean the cluster information it has.
752

753
  This will remove the configuration information from the ganeti data
754
  dir.
755

756
  This is a single-node call.
757

758
  """
759
  c = Client("node_leave_cluster", [])
760
  c.connect(node)
761
  c.run()
762
  return c.getresult().get(node, False)
763

    
764

    
765
def call_node_volumes(node_list):
766
  """Gets all volumes on node(s).
767

768
  This is a multi-node call.
769

770
  """
771
  c = Client("node_volumes", [])
772
  c.connect_list(node_list)
773
  c.run()
774
  return c.getresult()
775

    
776

    
777
def call_test_delay(node_list, duration):
778
  """Sleep for a fixed time on given node(s).
779

780
  This is a multi-node call.
781

782
  """
783
  c = Client("test_delay", [duration])
784
  c.connect_list(node_list)
785
  c.run()
786
  return c.getresult()
787

    
788

    
789
def call_file_storage_dir_create(node, file_storage_dir):
790
  """Create the given file storage directory.
791

792
  This is a single-node call.
793

794
  """
795
  c = Client("file_storage_dir_create", [file_storage_dir])
796
  c.connect(node)
797
  c.run()
798
  return c.getresult().get(node, False)
799

    
800

    
801
def call_file_storage_dir_remove(node, file_storage_dir):
802
  """Remove the given file storage directory.
803

804
  This is a single-node call.
805

806
  """
807
  c = Client("file_storage_dir_remove", [file_storage_dir])
808
  c.connect(node)
809
  c.run()
810
  return c.getresult().get(node, False)
811

    
812

    
813
def call_file_storage_dir_rename(node, old_file_storage_dir,
814
                                 new_file_storage_dir):
815
  """Rename file storage directory.
816

817
  This is a single-node call.
818

819
  """
820
  c = Client("file_storage_dir_rename",
821
             [old_file_storage_dir, new_file_storage_dir])
822
  c.connect(node)
823
  c.run()
824
  return c.getresult().get(node, False)
825

    
826

    
827
def call_jobqueue_update(node_list, file_name, content):
828
  """Update job queue.
829

830
  This is a multi-node call.
831

832
  """
833
  c = Client("jobqueue_update", [file_name, content])
834
  c.connect_list(node_list)
835
  c.run()
836
  result = c.getresult()
837
  return result
838

    
839

    
840
def call_jobqueue_purge(node):
841
  """Purge job queue.
842

843
  This is a single-node call.
844

845
  """
846
  c = Client("jobqueue_purge", [])
847
  c.connect(node)
848
  c.run()
849
  return c.getresult().get(node, False)
850

    
851

    
852
def call_jobqueue_rename(node_list, old, new):
853
  """Rename a job queue file.
854

855
  This is a multi-node call.
856

857
  """
858
  c = Client("jobqueue_rename", [old, new])
859
  c.connect_list(node_list)
860
  c.run()
861
  result = c.getresult()
862
  return result