Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ 4e071d3b

History | View | Annotate | Download (18.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script to show add a new node to the cluster
23

24
"""
25

    
26
# pylint: disable-msg=C0103
27

    
28
import os
29
import socket
30
import httplib
31

    
32
import simplejson
33

    
34
from ganeti import logger
35
from ganeti import utils
36
from ganeti import objects
37
from ganeti import ssconf
38

    
39

    
40
class NodeController:
41
  """Node-handling class.
42

43
  For each node that we speak with, we create an instance of this
44
  class, so that we have a safe place to store the details of this
45
  individual call.
46

47
  """
48
  def __init__(self, parent, node):
49
    self.parent = parent
50
    self.node = node
51
    self.failed = False
52

    
53
    self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
54
    try:
55
      hc.connect()
56
      hc.putrequest('PUT', "/%s" % self.parent.procedure,
57
                    skip_accept_encoding=True)
58
      hc.putheader('Content-Length', str(len(parent.body)))
59
      hc.endheaders()
60
      hc.send(parent.body)
61
    except socket.error, err:
62
      logger.Error("Error connecting to %s: %s" % (node, str(err)))
63
      self.failed = True
64

    
65
  def get_response(self):
66
    """Try to process the response from the node.
67

68
    """
69
    if self.failed:
70
      # we already failed in connect
71
      return False
72
    resp = self.http_conn.getresponse()
73
    if resp.status != 200:
74
      return False
75
    try:
76
      length = int(resp.getheader('Content-Length', '0'))
77
    except ValueError:
78
      return False
79
    if not length:
80
      logger.Error("Zero-length reply from %s" % self.node)
81
      return False
82
    payload = resp.read(length)
83
    unload = simplejson.loads(payload)
84
    return unload
85

    
86

    
87
class Client:
88
  """RPC Client class.
89

90
  This class, given a (remote) method name, a list of parameters and a
91
  list of nodes, will contact (in parallel) all nodes, and return a
92
  dict of results (key: node name, value: result).
93

94
  One current bug is that generic failure is still signalled by
95
  'False' result, which is not good. This overloading of values can
96
  cause bugs.
97

98
  """
99
  result_set = False
100
  result = False
101
  allresult = []
102

    
103
  def __init__(self, procedure, args):
104
    ss = ssconf.SimpleStore()
105
    self.port = ss.GetNodeDaemonPort()
106
    self.nodepw = ss.GetNodeDaemonPassword()
107
    self.nc = {}
108
    self.results = {}
109
    self.procedure = procedure
110
    self.args = args
111
    self.body = simplejson.dumps(args)
112

    
113
  #--- generic connector -------------
114

    
115
  def connect_list(self, node_list):
116
    """Add a list of nodes to the target nodes.
117

118
    """
119
    for node in node_list:
120
      self.connect(node)
121

    
122
  def connect(self, connect_node):
123
    """Add a node to the target list.
124

125
    """
126
    self.nc[connect_node] = nc = NodeController(self, connect_node)
127

    
128
  def getresult(self):
129
    """Return the results of the call.
130

131
    """
132
    return self.results
133

    
134
  def run(self):
135
    """Wrapper over reactor.run().
136

137
    This function simply calls reactor.run() if we have any requests
138
    queued, otherwise it does nothing.
139

140
    """
141
    for node, nc in self.nc.items():
142
      self.results[node] = nc.get_response()
143

    
144

    
145
def call_volume_list(node_list, vg_name):
146
  """Gets the logical volumes present in a given volume group.
147

148
  This is a multi-node call.
149

150
  """
151
  c = Client("volume_list", [vg_name])
152
  c.connect_list(node_list)
153
  c.run()
154
  return c.getresult()
155

    
156

    
157
def call_vg_list(node_list):
158
  """Gets the volume group list.
159

160
  This is a multi-node call.
161

162
  """
163
  c = Client("vg_list", [])
164
  c.connect_list(node_list)
165
  c.run()
166
  return c.getresult()
167

    
168

    
169
def call_bridges_exist(node, bridges_list):
170
  """Checks if a node has all the bridges given.
171

172
  This method checks if all bridges given in the bridges_list are
173
  present on the remote node, so that an instance that uses interfaces
174
  on those bridges can be started.
175

176
  This is a single-node call.
177

178
  """
179
  c = Client("bridges_exist", [bridges_list])
180
  c.connect(node)
181
  c.run()
182
  return c.getresult().get(node, False)
183

    
184

    
185
def call_instance_start(node, instance, extra_args):
186
  """Starts an instance.
187

188
  This is a single-node call.
189

190
  """
191
  c = Client("instance_start", [instance.ToDict(), extra_args])
192
  c.connect(node)
193
  c.run()
194
  return c.getresult().get(node, False)
195

    
196

    
197
def call_instance_shutdown(node, instance):
198
  """Stops an instance.
199

200
  This is a single-node call.
201

202
  """
203
  c = Client("instance_shutdown", [instance.ToDict()])
204
  c.connect(node)
205
  c.run()
206
  return c.getresult().get(node, False)
207

    
208

    
209
def call_instance_migrate(node, instance, target, live):
210
  """Migrate an instance.
211

212
  This is a single-node call.
213

214
  """
215
  c = Client("instance_migrate", [instance.name, target, live])
216
  c.connect(node)
217
  c.run()
218
  return c.getresult().get(node, False)
219

    
220

    
221
def call_instance_reboot(node, instance, reboot_type, extra_args):
222
  """Reboots an instance.
223

224
  This is a single-node call.
225

226
  """
227
  c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
228
  c.connect(node)
229
  c.run()
230
  return c.getresult().get(node, False)
231

    
232

    
233
def call_instance_os_add(node, inst, osdev, swapdev):
234
  """Installs an OS on the given instance.
235

236
  This is a single-node call.
237

238
  """
239
  params = [inst.ToDict(), osdev, swapdev]
240
  c = Client("instance_os_add", params)
241
  c.connect(node)
242
  c.run()
243
  return c.getresult().get(node, False)
244

    
245

    
246
def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
247
  """Run the OS rename script for an instance.
248

249
  This is a single-node call.
250

251
  """
252
  params = [inst.ToDict(), old_name, osdev, swapdev]
253
  c = Client("instance_run_rename", params)
254
  c.connect(node)
255
  c.run()
256
  return c.getresult().get(node, False)
257

    
258

    
259
def call_instance_info(node, instance):
260
  """Returns information about a single instance.
261

262
  This is a single-node call.
263

264
  """
265
  c = Client("instance_info", [instance])
266
  c.connect(node)
267
  c.run()
268
  return c.getresult().get(node, False)
269

    
270

    
271
def call_all_instances_info(node_list):
272
  """Returns information about all instances on a given node.
273

274
  This is a single-node call.
275

276
  """
277
  c = Client("all_instances_info", [])
278
  c.connect_list(node_list)
279
  c.run()
280
  return c.getresult()
281

    
282

    
283
def call_instance_list(node_list):
284
  """Returns the list of running instances on a given node.
285

286
  This is a single-node call.
287

288
  """
289
  c = Client("instance_list", [])
290
  c.connect_list(node_list)
291
  c.run()
292
  return c.getresult()
293

    
294

    
295
def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
296
  """Do a TcpPing on the remote node
297

298
  This is a single-node call.
299
  """
300
  c = Client("node_tcp_ping", [source, target, port, timeout,
301
                               live_port_needed])
302
  c.connect(node)
303
  c.run()
304
  return c.getresult().get(node, False)
305

    
306

    
307
def call_node_info(node_list, vg_name):
308
  """Return node information.
309

310
  This will return memory information and volume group size and free
311
  space.
312

313
  This is a multi-node call.
314

315
  """
316
  c = Client("node_info", [vg_name])
317
  c.connect_list(node_list)
318
  c.run()
319
  retux = c.getresult()
320

    
321
  for node_name in retux:
322
    ret = retux.get(node_name, False)
323
    if type(ret) != dict:
324
      logger.Error("could not connect to node %s" % (node_name))
325
      ret = {}
326

    
327
    utils.CheckDict(ret,
328
                    { 'memory_total' : '-',
329
                      'memory_dom0' : '-',
330
                      'memory_free' : '-',
331
                      'vg_size' : 'node_unreachable',
332
                      'vg_free' : '-' },
333
                    "call_node_info",
334
                    )
335
  return retux
336

    
337

    
338
def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
339
  """Add a node to the cluster.
340

341
  This is a single-node call.
342

343
  """
344
  params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
345
  c = Client("node_add", params)
346
  c.connect(node)
347
  c.run()
348
  return c.getresult().get(node, False)
349

    
350

    
351
def call_node_verify(node_list, checkdict):
352
  """Request verification of given parameters.
353

354
  This is a multi-node call.
355

356
  """
357
  c = Client("node_verify", [checkdict])
358
  c.connect_list(node_list)
359
  c.run()
360
  return c.getresult()
361

    
362

    
363
def call_node_start_master(node, start_daemons):
364
  """Tells a node to activate itself as a master.
365

366
  This is a single-node call.
367

368
  """
369
  c = Client("node_start_master", [start_daemons])
370
  c.connect(node)
371
  c.run()
372
  return c.getresult().get(node, False)
373

    
374

    
375
def call_node_stop_master(node, stop_daemons):
376
  """Tells a node to demote itself from master status.
377

378
  This is a single-node call.
379

380
  """
381
  c = Client("node_stop_master", [stop_daemons])
382
  c.connect(node)
383
  c.run()
384
  return c.getresult().get(node, False)
385

    
386

    
387
def call_master_info(node_list):
388
  """Query master info.
389

390
  This is a multi-node call.
391

392
  """
393
  c = Client("master_info", [])
394
  c.connect_list(node_list)
395
  c.run()
396
  return c.getresult()
397

    
398

    
399
def call_version(node_list):
400
  """Query node version.
401

402
  This is a multi-node call.
403

404
  """
405
  c = Client("version", [])
406
  c.connect_list(node_list)
407
  c.run()
408
  return c.getresult()
409

    
410

    
411
def call_blockdev_create(node, bdev, size, owner, on_primary, info):
412
  """Request creation of a given block device.
413

414
  This is a single-node call.
415

416
  """
417
  params = [bdev.ToDict(), size, owner, on_primary, info]
418
  c = Client("blockdev_create", params)
419
  c.connect(node)
420
  c.run()
421
  return c.getresult().get(node, False)
422

    
423

    
424
def call_blockdev_remove(node, bdev):
425
  """Request removal of a given block device.
426

427
  This is a single-node call.
428

429
  """
430
  c = Client("blockdev_remove", [bdev.ToDict()])
431
  c.connect(node)
432
  c.run()
433
  return c.getresult().get(node, False)
434

    
435

    
436
def call_blockdev_rename(node, devlist):
437
  """Request rename of the given block devices.
438

439
  This is a single-node call.
440

441
  """
442
  params = [(d.ToDict(), uid) for d, uid in devlist]
443
  c = Client("blockdev_rename", params)
444
  c.connect(node)
445
  c.run()
446
  return c.getresult().get(node, False)
447

    
448

    
449
def call_blockdev_assemble(node, disk, owner, on_primary):
450
  """Request assembling of a given block device.
451

452
  This is a single-node call.
453

454
  """
455
  params = [disk.ToDict(), owner, on_primary]
456
  c = Client("blockdev_assemble", params)
457
  c.connect(node)
458
  c.run()
459
  return c.getresult().get(node, False)
460

    
461

    
462
def call_blockdev_shutdown(node, disk):
463
  """Request shutdown of a given block device.
464

465
  This is a single-node call.
466

467
  """
468
  c = Client("blockdev_shutdown", [disk.ToDict()])
469
  c.connect(node)
470
  c.run()
471
  return c.getresult().get(node, False)
472

    
473

    
474
def call_blockdev_addchildren(node, bdev, ndevs):
475
  """Request adding a list of children to a (mirroring) device.
476

477
  This is a single-node call.
478

479
  """
480
  params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
481
  c = Client("blockdev_addchildren", params)
482
  c.connect(node)
483
  c.run()
484
  return c.getresult().get(node, False)
485

    
486

    
487
def call_blockdev_removechildren(node, bdev, ndevs):
488
  """Request removing a list of children from a (mirroring) device.
489

490
  This is a single-node call.
491

492
  """
493
  params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
494
  c = Client("blockdev_removechildren", params)
495
  c.connect(node)
496
  c.run()
497
  return c.getresult().get(node, False)
498

    
499

    
500
def call_blockdev_getmirrorstatus(node, disks):
501
  """Request status of a (mirroring) device.
502

503
  This is a single-node call.
504

505
  """
506
  params = [dsk.ToDict() for dsk in disks]
507
  c = Client("blockdev_getmirrorstatus", params)
508
  c.connect(node)
509
  c.run()
510
  return c.getresult().get(node, False)
511

    
512

    
513
def call_blockdev_find(node, disk):
514
  """Request identification of a given block device.
515

516
  This is a single-node call.
517

518
  """
519
  c = Client("blockdev_find", [disk.ToDict()])
520
  c.connect(node)
521
  c.run()
522
  return c.getresult().get(node, False)
523

    
524

    
525
def call_blockdev_close(node, disks):
526
  """Closes the given block devices.
527

528
  This is a single-node call.
529

530
  """
531
  params = [cf.ToDict() for cf in disks]
532
  c = Client("blockdev_close", params)
533
  c.connect(node)
534
  c.run()
535
  return c.getresult().get(node, False)
536

    
537

    
538
def call_upload_file(node_list, file_name):
539
  """Upload a file.
540

541
  The node will refuse the operation in case the file is not on the
542
  approved file list.
543

544
  This is a multi-node call.
545

546
  """
547
  fh = file(file_name)
548
  try:
549
    data = fh.read()
550
  finally:
551
    fh.close()
552
  st = os.stat(file_name)
553
  params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
554
            st.st_atime, st.st_mtime]
555
  c = Client("upload_file", params)
556
  c.connect_list(node_list)
557
  c.run()
558
  return c.getresult()
559

    
560

    
561
def call_os_diagnose(node_list):
562
  """Request a diagnose of OS definitions.
563

564
  This is a multi-node call.
565

566
  """
567
  c = Client("os_diagnose", [])
568
  c.connect_list(node_list)
569
  c.run()
570
  result = c.getresult()
571
  new_result = {}
572
  for node_name in result:
573
    if result[node_name]:
574
      nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
575
    else:
576
      nr = []
577
    new_result[node_name] = nr
578
  return new_result
579

    
580

    
581
def call_os_get(node, name):
582
  """Returns an OS definition.
583

584
  This is a single-node call.
585

586
  """
587
  c = Client("os_get", [name])
588
  c.connect(node)
589
  c.run()
590
  result = c.getresult().get(node, False)
591
  if isinstance(result, dict):
592
    return objects.OS.FromDict(result)
593
  else:
594
    return result
595

    
596

    
597
def call_hooks_runner(node_list, hpath, phase, env):
598
  """Call the hooks runner.
599

600
  Args:
601
    - op: the OpCode instance
602
    - env: a dictionary with the environment
603

604
  This is a multi-node call.
605

606
  """
607
  params = [hpath, phase, env]
608
  c = Client("hooks_runner", params)
609
  c.connect_list(node_list)
610
  c.run()
611
  result = c.getresult()
612
  return result
613

    
614

    
615
def call_iallocator_runner(node, name, idata):
616
  """Call an iallocator on a remote node
617

618
  Args:
619
    - name: the iallocator name
620
    - input: the json-encoded input string
621

622
  This is a single-node call.
623

624
  """
625
  params = [name, idata]
626
  c = Client("iallocator_runner", params)
627
  c.connect(node)
628
  c.run()
629
  result = c.getresult().get(node, False)
630
  return result
631

    
632

    
633
def call_blockdev_grow(node, cf_bdev, amount):
634
  """Request a snapshot of the given block device.
635

636
  This is a single-node call.
637

638
  """
639
  c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
640
  c.connect(node)
641
  c.run()
642
  return c.getresult().get(node, False)
643

    
644

    
645
def call_blockdev_snapshot(node, cf_bdev):
646
  """Request a snapshot of the given block device.
647

648
  This is a single-node call.
649

650
  """
651
  c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
652
  c.connect(node)
653
  c.run()
654
  return c.getresult().get(node, False)
655

    
656

    
657
def call_snapshot_export(node, snap_bdev, dest_node, instance):
658
  """Request the export of a given snapshot.
659

660
  This is a single-node call.
661

662
  """
663
  params = [snap_bdev.ToDict(), dest_node, instance.ToDict()]
664
  c = Client("snapshot_export", params)
665
  c.connect(node)
666
  c.run()
667
  return c.getresult().get(node, False)
668

    
669

    
670
def call_finalize_export(node, instance, snap_disks):
671
  """Request the completion of an export operation.
672

673
  This writes the export config file, etc.
674

675
  This is a single-node call.
676

677
  """
678
  flat_disks = []
679
  for disk in snap_disks:
680
    flat_disks.append(disk.ToDict())
681
  params = [instance.ToDict(), flat_disks]
682
  c = Client("finalize_export", params)
683
  c.connect(node)
684
  c.run()
685
  return c.getresult().get(node, False)
686

    
687

    
688
def call_export_info(node, path):
689
  """Queries the export information in a given path.
690

691
  This is a single-node call.
692

693
  """
694
  c = Client("export_info", [path])
695
  c.connect(node)
696
  c.run()
697
  result = c.getresult().get(node, False)
698
  if not result:
699
    return result
700
  return objects.SerializableConfigParser.Loads(str(result))
701

    
702

    
703
def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
704
  """Request the import of a backup into an instance.
705

706
  This is a single-node call.
707

708
  """
709
  params = [inst.ToDict(), osdev, swapdev, src_node, src_image]
710
  c = Client("instance_os_import", params)
711
  c.connect(node)
712
  c.run()
713
  return c.getresult().get(node, False)
714

    
715

    
716
def call_export_list(node_list):
717
  """Gets the stored exports list.
718

719
  This is a multi-node call.
720

721
  """
722
  c = Client("export_list", [])
723
  c.connect_list(node_list)
724
  c.run()
725
  result = c.getresult()
726
  return result
727

    
728

    
729
def call_export_remove(node, export):
730
  """Requests removal of a given export.
731

732
  This is a single-node call.
733

734
  """
735
  c = Client("export_remove", [export])
736
  c.connect(node)
737
  c.run()
738
  return c.getresult().get(node, False)
739

    
740

    
741
def call_node_leave_cluster(node):
742
  """Requests a node to clean the cluster information it has.
743

744
  This will remove the configuration information from the ganeti data
745
  dir.
746

747
  This is a single-node call.
748

749
  """
750
  c = Client("node_leave_cluster", [])
751
  c.connect(node)
752
  c.run()
753
  return c.getresult().get(node, False)
754

    
755

    
756
def call_node_volumes(node_list):
757
  """Gets all volumes on node(s).
758

759
  This is a multi-node call.
760

761
  """
762
  c = Client("node_volumes", [])
763
  c.connect_list(node_list)
764
  c.run()
765
  return c.getresult()
766

    
767

    
768
def call_test_delay(node_list, duration):
769
  """Sleep for a fixed time on given node(s).
770

771
  This is a multi-node call.
772

773
  """
774
  c = Client("test_delay", [duration])
775
  c.connect_list(node_list)
776
  c.run()
777
  return c.getresult()
778

    
779

    
780
def call_file_storage_dir_create(node, file_storage_dir):
781
  """Create the given file storage directory.
782

783
  This is a single-node call.
784

785
  """
786
  c = Client("file_storage_dir_create", [file_storage_dir])
787
  c.connect(node)
788
  c.run()
789
  return c.getresult().get(node, False)
790

    
791

    
792
def call_file_storage_dir_remove(node, file_storage_dir):
793
  """Remove the given file storage directory.
794

795
  This is a single-node call.
796

797
  """
798
  c = Client("file_storage_dir_remove", [file_storage_dir])
799
  c.connect(node)
800
  c.run()
801
  return c.getresult().get(node, False)
802

    
803

    
804
def call_file_storage_dir_rename(node, old_file_storage_dir,
805
                                 new_file_storage_dir):
806
  """Rename file storage directory.
807

808
  This is a single-node call.
809

810
  """
811
  c = Client("file_storage_dir_rename",
812
             [old_file_storage_dir, new_file_storage_dir])
813
  c.connect(node)
814
  c.run()
815
  return c.getresult().get(node, False)
816

    
817

    
818
def call_jobqueue_update(node_list, file_name, content):
819
  """Update job queue.
820

821
  This is a multi-node call.
822

823
  """
824
  c = Client("jobqueue_update", [file_name, content])
825
  c.connect_list(node_list)
826
  c.run()
827
  result = c.getresult()
828
  return result
829

    
830

    
831
def call_jobqueue_purge(node):
832
  """Purge job queue.
833

834
  This is a single-node call.
835

836
  """
837
  c = Client("jobqueue_purge", [])
838
  c.connect(node)
839
  c.run()
840
  return c.getresult().get(node, False)
841

    
842

    
843
def call_jobqueue_rename(node_list, old, new):
844
  """Rename a job queue file.
845

846
  This is a multi-node call.
847

848
  """
849
  c = Client("jobqueue_rename", [old, new])
850
  c.connect_list(node_list)
851
  c.run()
852
  result = c.getresult()
853
  return result