Revision 3ef3c771 lib/rpc.py

b/lib/rpc.py
19 19
# 02110-1301, USA.
20 20

  
21 21

  
22
"""Script to show add a new node to the cluster
22
"""Inter-node RPC library.
23 23

  
24 24
"""
25 25

  
......
66 66
      logging.exception("Error connecting to node %s", node)
67 67
      self.failed = True
68 68

  
69
  def get_response(self):
69
  def GetResponse(self):
70 70
    """Try to process the response from the node.
71 71

  
72 72
    """
......
115 115

  
116 116
  #--- generic connector -------------
117 117

  
118
  def connect_list(self, node_list):
118
  def ConnectList(self, node_list):
119 119
    """Add a list of nodes to the target nodes.
120 120

  
121
    @type node_list: list
122
    @param node_list: the list of node names to connect
123

  
121 124
    """
122 125
    for node in node_list:
123
      self.connect(node)
126
      self.ConnectNode(node)
124 127

  
125
  def connect(self, connect_node):
128
  def ConnectNode(self, connect_node):
126 129
    """Add a node to the target list.
127 130

  
128 131
    """
129 132
    self.nc[connect_node] = nc = NodeController(self, connect_node)
130 133

  
131
  def getresult(self):
134
  def GetResults(self):
132 135
    """Return the results of the call.
133 136

  
134 137
    """
135 138
    return self.results
136 139

  
137
  def run(self):
138
    """Wrapper over reactor.run().
140
  def Run(self):
141
    """Gather results from the node controllers.
139 142

  
140
    This function simply calls reactor.run() if we have any requests
141
    queued, otherwise it does nothing.
143
    This function simply calls GetResponse() for each of our node
144
    controllers.
142 145

  
143 146
    """
144 147
    for node, nc in self.nc.items():
145
      self.results[node] = nc.get_response()
148
      self.results[node] = nc.GetResponse()
146 149

  
147 150

  
148 151
class RpcRunner(object):
......
184 187

  
185 188
    """
186 189
    c = Client("volume_list", [vg_name])
187
    c.connect_list(node_list)
188
    c.run()
189
    return c.getresult()
190
    c.ConnectList(node_list)
191
    c.Run()
192
    return c.GetResults()
190 193

  
191 194
  def call_vg_list(self, node_list):
192 195
    """Gets the volume group list.
......
195 198

  
196 199
    """
197 200
    c = Client("vg_list", [])
198
    c.connect_list(node_list)
199
    c.run()
200
    return c.getresult()
201
    c.ConnectList(node_list)
202
    c.Run()
203
    return c.GetResults()
201 204

  
202 205
  def call_bridges_exist(self, node, bridges_list):
203 206
    """Checks if a node has all the bridges given.
......
210 213

  
211 214
    """
212 215
    c = Client("bridges_exist", [bridges_list])
213
    c.connect(node)
214
    c.run()
215
    return c.getresult().get(node, False)
216
    c.ConnectNode(node)
217
    c.Run()
218
    return c.GetResults().get(node, False)
216 219

  
217 220
  def call_instance_start(self, node, instance, extra_args):
218 221
    """Starts an instance.
......
221 224

  
222 225
    """
223 226
    c = Client("instance_start", [self._InstDict(instance), extra_args])
224
    c.connect(node)
225
    c.run()
226
    return c.getresult().get(node, False)
227
    c.ConnectNode(node)
228
    c.Run()
229
    return c.GetResults().get(node, False)
227 230

  
228 231
  def call_instance_shutdown(self, node, instance):
229 232
    """Stops an instance.
......
232 235

  
233 236
    """
234 237
    c = Client("instance_shutdown", [self._InstDict(instance)])
235
    c.connect(node)
236
    c.run()
237
    return c.getresult().get(node, False)
238
    c.ConnectNode(node)
239
    c.Run()
240
    return c.GetResults().get(node, False)
238 241

  
239 242
  def call_instance_migrate(self, node, instance, target, live):
240 243
    """Migrate an instance.
......
253 256

  
254 257
    """
255 258
    c = Client("instance_migrate", [self._InstDict(instance), target, live])
256
    c.connect(node)
257
    c.run()
258
    return c.getresult().get(node, False)
259
    c.ConnectNode(node)
260
    c.Run()
261
    return c.GetResults().get(node, False)
259 262

  
260 263
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
261 264
    """Reboots an instance.
......
265 268
    """
266 269
    c = Client("instance_reboot", [self._InstDict(instance),
267 270
                                   reboot_type, extra_args])
268
    c.connect(node)
269
    c.run()
270
    return c.getresult().get(node, False)
271
    c.ConnectNode(node)
272
    c.Run()
273
    return c.GetResults().get(node, False)
271 274

  
272 275
  def call_instance_os_add(self, node, inst):
273 276
    """Installs an OS on the given instance.
......
277 280
    """
278 281
    params = [self._InstDict(inst)]
279 282
    c = Client("instance_os_add", params)
280
    c.connect(node)
281
    c.run()
282
    return c.getresult().get(node, False)
283
    c.ConnectNode(node)
284
    c.Run()
285
    return c.GetResults().get(node, False)
283 286

  
284 287
  def call_instance_run_rename(self, node, inst, old_name):
285 288
    """Run the OS rename script for an instance.
......
289 292
    """
290 293
    params = [self._InstDict(inst), old_name]
291 294
    c = Client("instance_run_rename", params)
292
    c.connect(node)
293
    c.run()
294
    return c.getresult().get(node, False)
295
    c.ConnectNode(node)
296
    c.Run()
297
    return c.GetResults().get(node, False)
295 298

  
296 299
  def call_instance_info(self, node, instance, hname):
297 300
    """Returns information about a single instance.
......
307 310

  
308 311
    """
309 312
    c = Client("instance_info", [instance, hname])
310
    c.connect(node)
311
    c.run()
312
    return c.getresult().get(node, False)
313
    c.ConnectNode(node)
314
    c.Run()
315
    return c.GetResults().get(node, False)
313 316

  
314 317
  def call_all_instances_info(self, node_list, hypervisor_list):
315 318
    """Returns information about all instances on the given nodes.
......
323 326

  
324 327
    """
325 328
    c = Client("all_instances_info", [hypervisor_list])
326
    c.connect_list(node_list)
327
    c.run()
328
    return c.getresult()
329
    c.ConnectList(node_list)
330
    c.Run()
331
    return c.GetResults()
329 332

  
330 333
  def call_instance_list(self, node_list, hypervisor_list):
331 334
    """Returns the list of running instances on a given node.
......
339 342

  
340 343
    """
341 344
    c = Client("instance_list", [hypervisor_list])
342
    c.connect_list(node_list)
343
    c.run()
344
    return c.getresult()
345
    c.ConnectList(node_list)
346
    c.Run()
347
    return c.GetResults()
345 348

  
346 349
  def call_node_tcp_ping(self, node, source, target, port, timeout,
347 350
                         live_port_needed):
......
352 355
    """
353 356
    c = Client("node_tcp_ping", [source, target, port, timeout,
354 357
                                 live_port_needed])
355
    c.connect(node)
356
    c.run()
357
    return c.getresult().get(node, False)
358
    c.ConnectNode(node)
359
    c.Run()
360
    return c.GetResults().get(node, False)
358 361

  
359 362
  def call_node_has_ip_address(self, node, address):
360 363
    """Checks if a node has the given IP address.
......
363 366

  
364 367
    """
365 368
    c = Client("node_has_ip_address", [address])
366
    c.connect(node)
367
    c.run()
368
    return c.getresult().get(node, False)
369
    c.ConnectNode(node)
370
    c.Run()
371
    return c.GetResults().get(node, False)
369 372

  
370 373
  def call_node_info(self, node_list, vg_name, hypervisor_type):
371 374
    """Return node information.
......
386 389

  
387 390
    """
388 391
    c = Client("node_info", [vg_name, hypervisor_type])
389
    c.connect_list(node_list)
390
    c.run()
391
    retux = c.getresult()
392
    c.ConnectList(node_list)
393
    c.Run()
394
    retux = c.GetResults()
392 395

  
393 396
    for node_name in retux:
394 397
      ret = retux.get(node_name, False)
......
414 417
    """
415 418
    params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
416 419
    c = Client("node_add", params)
417
    c.connect(node)
418
    c.run()
419
    return c.getresult().get(node, False)
420
    c.ConnectNode(node)
421
    c.Run()
422
    return c.GetResults().get(node, False)
420 423

  
421 424
  def call_node_verify(self, node_list, checkdict, cluster_name):
422 425
    """Request verification of given parameters.
......
425 428

  
426 429
    """
427 430
    c = Client("node_verify", [checkdict, cluster_name])
428
    c.connect_list(node_list)
429
    c.run()
430
    return c.getresult()
431
    c.ConnectList(node_list)
432
    c.Run()
433
    return c.GetResults()
431 434

  
432 435
  @staticmethod
433 436
  def call_node_start_master(node, start_daemons):
......
437 440

  
438 441
    """
439 442
    c = Client("node_start_master", [start_daemons])
440
    c.connect(node)
441
    c.run()
442
    return c.getresult().get(node, False)
443
    c.ConnectNode(node)
444
    c.Run()
445
    return c.GetResults().get(node, False)
443 446

  
444 447
  @staticmethod
445 448
  def call_node_stop_master(node, stop_daemons):
......
449 452

  
450 453
    """
451 454
    c = Client("node_stop_master", [stop_daemons])
452
    c.connect(node)
453
    c.run()
454
    return c.getresult().get(node, False)
455
    c.ConnectNode(node)
456
    c.Run()
457
    return c.GetResults().get(node, False)
455 458

  
456 459
  @staticmethod
457 460
  def call_master_info(node_list):
......
462 465
    """
463 466
    # TODO: should this method query down nodes?
464 467
    c = Client("master_info", [])
465
    c.connect_list(node_list)
466
    c.run()
467
    return c.getresult()
468
    c.ConnectList(node_list)
469
    c.Run()
470
    return c.GetResults()
468 471

  
469 472
  def call_version(self, node_list):
470 473
    """Query node version.
......
473 476

  
474 477
    """
475 478
    c = Client("version", [])
476
    c.connect_list(node_list)
477
    c.run()
478
    return c.getresult()
479
    c.ConnectList(node_list)
480
    c.Run()
481
    return c.GetResults()
479 482

  
480 483
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
481 484
    """Request creation of a given block device.
......
485 488
    """
486 489
    params = [bdev.ToDict(), size, owner, on_primary, info]
487 490
    c = Client("blockdev_create", params)
488
    c.connect(node)
489
    c.run()
490
    return c.getresult().get(node, False)
491
    c.ConnectNode(node)
492
    c.Run()
493
    return c.GetResults().get(node, False)
491 494

  
492 495
  def call_blockdev_remove(self, node, bdev):
493 496
    """Request removal of a given block device.
......
496 499

  
497 500
    """
498 501
    c = Client("blockdev_remove", [bdev.ToDict()])
499
    c.connect(node)
500
    c.run()
501
    return c.getresult().get(node, False)
502
    c.ConnectNode(node)
503
    c.Run()
504
    return c.GetResults().get(node, False)
502 505

  
503 506
  def call_blockdev_rename(self, node, devlist):
504 507
    """Request rename of the given block devices.
......
508 511
    """
509 512
    params = [(d.ToDict(), uid) for d, uid in devlist]
510 513
    c = Client("blockdev_rename", params)
511
    c.connect(node)
512
    c.run()
513
    return c.getresult().get(node, False)
514
    c.ConnectNode(node)
515
    c.Run()
516
    return c.GetResults().get(node, False)
514 517

  
515 518
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
516 519
    """Request assembling of a given block device.
......
520 523
    """
521 524
    params = [disk.ToDict(), owner, on_primary]
522 525
    c = Client("blockdev_assemble", params)
523
    c.connect(node)
524
    c.run()
525
    return c.getresult().get(node, False)
526
    c.ConnectNode(node)
527
    c.Run()
528
    return c.GetResults().get(node, False)
526 529

  
527 530
  def call_blockdev_shutdown(self, node, disk):
528 531
    """Request shutdown of a given block device.
......
531 534

  
532 535
    """
533 536
    c = Client("blockdev_shutdown", [disk.ToDict()])
534
    c.connect(node)
535
    c.run()
536
    return c.getresult().get(node, False)
537
    c.ConnectNode(node)
538
    c.Run()
539
    return c.GetResults().get(node, False)
537 540

  
538 541
  def call_blockdev_addchildren(self, node, bdev, ndevs):
539 542
    """Request adding a list of children to a (mirroring) device.
......
543 546
    """
544 547
    params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
545 548
    c = Client("blockdev_addchildren", params)
546
    c.connect(node)
547
    c.run()
548
    return c.getresult().get(node, False)
549
    c.ConnectNode(node)
550
    c.Run()
551
    return c.GetResults().get(node, False)
549 552

  
550 553
  def call_blockdev_removechildren(self, node, bdev, ndevs):
551 554
    """Request removing a list of children from a (mirroring) device.
......
555 558
    """
556 559
    params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
557 560
    c = Client("blockdev_removechildren", params)
558
    c.connect(node)
559
    c.run()
560
    return c.getresult().get(node, False)
561
    c.ConnectNode(node)
562
    c.Run()
563
    return c.GetResults().get(node, False)
561 564

  
562 565
  def call_blockdev_getmirrorstatus(self, node, disks):
563 566
    """Request status of a (mirroring) device.
......
567 570
    """
568 571
    params = [dsk.ToDict() for dsk in disks]
569 572
    c = Client("blockdev_getmirrorstatus", params)
570
    c.connect(node)
571
    c.run()
572
    return c.getresult().get(node, False)
573
    c.ConnectNode(node)
574
    c.Run()
575
    return c.GetResults().get(node, False)
573 576

  
574 577
  def call_blockdev_find(self, node, disk):
575 578
    """Request identification of a given block device.
......
578 581

  
579 582
    """
580 583
    c = Client("blockdev_find", [disk.ToDict()])
581
    c.connect(node)
582
    c.run()
583
    return c.getresult().get(node, False)
584
    c.ConnectNode(node)
585
    c.Run()
586
    return c.GetResults().get(node, False)
584 587

  
585 588
  def call_blockdev_close(self, node, disks):
586 589
    """Closes the given block devices.
......
590 593
    """
591 594
    params = [cf.ToDict() for cf in disks]
592 595
    c = Client("blockdev_close", params)
593
    c.connect(node)
594
    c.run()
595
    return c.getresult().get(node, False)
596
    c.ConnectNode(node)
597
    c.Run()
598
    return c.GetResults().get(node, False)
596 599

  
597 600
  @staticmethod
598 601
  def call_upload_file(node_list, file_name):
......
613 616
    params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
614 617
              st.st_atime, st.st_mtime]
615 618
    c = Client("upload_file", params)
616
    c.connect_list(node_list)
617
    c.run()
618
    return c.getresult()
619
    c.ConnectList(node_list)
620
    c.Run()
621
    return c.GetResults()
619 622

  
620 623
  def call_os_diagnose(self, node_list):
621 624
    """Request a diagnose of OS definitions.
......
624 627

  
625 628
    """
626 629
    c = Client("os_diagnose", [])
627
    c.connect_list(node_list)
628
    c.run()
629
    result = c.getresult()
630
    c.ConnectList(node_list)
631
    c.Run()
632
    result = c.GetResults()
630 633
    new_result = {}
631 634
    for node_name in result:
632 635
      if result[node_name]:
......
643 646

  
644 647
    """
645 648
    c = Client("os_get", [name])
646
    c.connect(node)
647
    c.run()
648
    result = c.getresult().get(node, False)
649
    c.ConnectNode(node)
650
    c.Run()
651
    result = c.GetResults().get(node, False)
649 652
    if isinstance(result, dict):
650 653
      return objects.OS.FromDict(result)
651 654
    else:
......
663 666
    """
664 667
    params = [hpath, phase, env]
665 668
    c = Client("hooks_runner", params)
666
    c.connect_list(node_list)
667
    c.run()
668
    result = c.getresult()
669
    c.ConnectList(node_list)
670
    c.Run()
671
    result = c.GetResults()
669 672
    return result
670 673

  
671 674
  def call_iallocator_runner(self, node, name, idata):
......
680 683
    """
681 684
    params = [name, idata]
682 685
    c = Client("iallocator_runner", params)
683
    c.connect(node)
684
    c.run()
685
    result = c.getresult().get(node, False)
686
    c.ConnectNode(node)
687
    c.Run()
688
    result = c.GetResults().get(node, False)
686 689
    return result
687 690

  
688 691
  def call_blockdev_grow(self, node, cf_bdev, amount):
......
692 695

  
693 696
    """
694 697
    c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
695
    c.connect(node)
696
    c.run()
697
    return c.getresult().get(node, False)
698
    c.ConnectNode(node)
699
    c.Run()
700
    return c.GetResults().get(node, False)
698 701

  
699 702
  def call_blockdev_snapshot(self, node, cf_bdev):
700 703
    """Request a snapshot of the given block device.
......
703 706

  
704 707
    """
705 708
    c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
706
    c.connect(node)
707
    c.run()
708
    return c.getresult().get(node, False)
709
    c.ConnectNode(node)
710
    c.Run()
711
    return c.GetResults().get(node, False)
709 712

  
710 713
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
711 714
                           cluster_name):
......
717 720
    params = [snap_bdev.ToDict(), dest_node,
718 721
              self._InstDict(instance), cluster_name]
719 722
    c = Client("snapshot_export", params)
720
    c.connect(node)
721
    c.run()
722
    return c.getresult().get(node, False)
723
    c.ConnectNode(node)
724
    c.Run()
725
    return c.GetResults().get(node, False)
723 726

  
724 727
  def call_finalize_export(self, node, instance, snap_disks):
725 728
    """Request the completion of an export operation.
......
734 737
      flat_disks.append(disk.ToDict())
735 738
    params = [self._InstDict(instance), flat_disks]
736 739
    c = Client("finalize_export", params)
737
    c.connect(node)
738
    c.run()
739
    return c.getresult().get(node, False)
740
    c.ConnectNode(node)
741
    c.Run()
742
    return c.GetResults().get(node, False)
740 743

  
741 744
  def call_export_info(self, node, path):
742 745
    """Queries the export information in a given path.
......
745 748

  
746 749
    """
747 750
    c = Client("export_info", [path])
748
    c.connect(node)
749
    c.run()
750
    result = c.getresult().get(node, False)
751
    c.ConnectNode(node)
752
    c.Run()
753
    result = c.GetResults().get(node, False)
751 754
    if not result:
752 755
      return result
753 756
    return objects.SerializableConfigParser.Loads(str(result))
......
762 765
    params = [self._InstDict(inst), osdev, swapdev,
763 766
              src_node, src_image, cluster_name]
764 767
    c = Client("instance_os_import", params)
765
    c.connect(node)
766
    c.run()
767
    return c.getresult().get(node, False)
768
    c.ConnectNode(node)
769
    c.Run()
770
    return c.GetResults().get(node, False)
768 771

  
769 772
  def call_export_list(self, node_list):
770 773
    """Gets the stored exports list.
......
773 776

  
774 777
    """
775 778
    c = Client("export_list", [])
776
    c.connect_list(node_list)
777
    c.run()
778
    result = c.getresult()
779
    c.ConnectList(node_list)
780
    c.Run()
781
    result = c.GetResults()
779 782
    return result
780 783

  
781 784
  def call_export_remove(self, node, export):
......
785 788

  
786 789
    """
787 790
    c = Client("export_remove", [export])
788
    c.connect(node)
789
    c.run()
790
    return c.getresult().get(node, False)
791
    c.ConnectNode(node)
792
    c.Run()
793
    return c.GetResults().get(node, False)
791 794

  
792 795
  @staticmethod
793 796
  def call_node_leave_cluster(node):
......
800 803

  
801 804
    """
802 805
    c = Client("node_leave_cluster", [])
803
    c.connect(node)
804
    c.run()
805
    return c.getresult().get(node, False)
806
    c.ConnectNode(node)
807
    c.Run()
808
    return c.GetResults().get(node, False)
806 809

  
807 810
  def call_node_volumes(self, node_list):
808 811
    """Gets all volumes on node(s).
......
811 814

  
812 815
    """
813 816
    c = Client("node_volumes", [])
814
    c.connect_list(node_list)
815
    c.run()
816
    return c.getresult()
817
    c.ConnectList(node_list)
818
    c.Run()
819
    return c.GetResults()
817 820

  
818 821
  def call_test_delay(self, node_list, duration):
819 822
    """Sleep for a fixed time on given node(s).
......
822 825

  
823 826
    """
824 827
    c = Client("test_delay", [duration])
825
    c.connect_list(node_list)
826
    c.run()
827
    return c.getresult()
828
    c.ConnectList(node_list)
829
    c.Run()
830
    return c.GetResults()
828 831

  
829 832
  def call_file_storage_dir_create(self, node, file_storage_dir):
830 833
    """Create the given file storage directory.
......
833 836

  
834 837
    """
835 838
    c = Client("file_storage_dir_create", [file_storage_dir])
836
    c.connect(node)
837
    c.run()
838
    return c.getresult().get(node, False)
839
    c.ConnectNode(node)
840
    c.Run()
841
    return c.GetResults().get(node, False)
839 842

  
840 843
  def call_file_storage_dir_remove(self, node, file_storage_dir):
841 844
    """Remove the given file storage directory.
......
844 847

  
845 848
    """
846 849
    c = Client("file_storage_dir_remove", [file_storage_dir])
847
    c.connect(node)
848
    c.run()
849
    return c.getresult().get(node, False)
850
    c.ConnectNode(node)
851
    c.Run()
852
    return c.GetResults().get(node, False)
850 853

  
851 854
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
852 855
                                   new_file_storage_dir):
......
857 860
    """
858 861
    c = Client("file_storage_dir_rename",
859 862
               [old_file_storage_dir, new_file_storage_dir])
860
    c.connect(node)
861
    c.run()
862
    return c.getresult().get(node, False)
863
    c.ConnectNode(node)
864
    c.Run()
865
    return c.GetResults().get(node, False)
863 866

  
864 867
  @staticmethod
865 868
  def call_jobqueue_update(node_list, file_name, content):
......
869 872

  
870 873
    """
871 874
    c = Client("jobqueue_update", [file_name, content])
872
    c.connect_list(node_list)
873
    c.run()
874
    result = c.getresult()
875
    c.ConnectList(node_list)
876
    c.Run()
877
    result = c.GetResults()
875 878
    return result
876 879

  
877 880
  @staticmethod
......
882 885

  
883 886
    """
884 887
    c = Client("jobqueue_purge", [])
885
    c.connect(node)
886
    c.run()
887
    return c.getresult().get(node, False)
888
    c.ConnectNode(node)
889
    c.Run()
890
    return c.GetResults().get(node, False)
888 891

  
889 892
  @staticmethod
890 893
  def call_jobqueue_rename(node_list, old, new):
......
894 897

  
895 898
    """
896 899
    c = Client("jobqueue_rename", [old, new])
897
    c.connect_list(node_list)
898
    c.run()
899
    result = c.getresult()
900
    c.ConnectList(node_list)
901
    c.Run()
902
    result = c.GetResults()
900 903
    return result
901 904

  
902 905

  
......
913 916

  
914 917
    """
915 918
    c = Client("jobqueue_set_drain", [drain_flag])
916
    c.connect_list(node_list)
917
    c.run()
918
    result = c.getresult()
919
    c.ConnectList(node_list)
920
    c.Run()
921
    result = c.GetResults()
919 922
    return result
920 923

  
921 924

  
......
935 938
    cluster = self._cfg.GetClusterInfo()
936 939
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
937 940
    c = Client("hypervisor_validate_params", [hvname, hv_full])
938
    c.connect_list(node_list)
939
    c.run()
940
    result = c.getresult()
941
    c.ConnectList(node_list)
942
    c.Run()
943
    result = c.GetResults()
941 944
    return result

Also available in: Unified diff