Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ d61cbe76

History | View | Annotate | Download (17.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script to show add a new node to the cluster
23

24
"""
25

    
26
# pylint: disable-msg=C0103
27

    
28
import os
29
import socket
30
import httplib
31

    
32
import simplejson
33

    
34
from ganeti import logger
35
from ganeti import utils
36
from ganeti import errors
37
from ganeti import constants
38
from ganeti import objects
39
from ganeti import ssconf
40

    
41

    
42
class NodeController:
43
  """Node-handling class.
44

45
  For each node that we speak with, we create an instance of this
46
  class, so that we have a safe place to store the details of this
47
  individual call.
48

49
  """
50
  def __init__(self, parent, node):
51
    self.parent = parent
52
    self.node = node
53
    self.failed = False
54

    
55
    self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
56
    try:
57
      hc.connect()
58
      hc.putrequest('PUT', "/%s" % self.parent.procedure,
59
                    skip_accept_encoding=True)
60
      hc.putheader('Content-Length', str(len(parent.body)))
61
      hc.endheaders()
62
      hc.send(parent.body)
63
    except socket.error, err:
64
      logger.Error("Error connecting to %s: %s" % (node, str(err)))
65
      self.failed = True
66

    
67
  def get_response(self):
68
    """Try to process the response from the node.
69

70
    """
71
    if self.failed:
72
      # we already failed in connect
73
      return False
74
    resp = self.http_conn.getresponse()
75
    if resp.status != 200:
76
      return False
77
    try:
78
      length = int(resp.getheader('Content-Length', '0'))
79
    except ValueError:
80
      return False
81
    if not length:
82
      logger.Error("Zero-length reply from %s" % self.node)
83
      return False
84
    payload = resp.read(length)
85
    unload = simplejson.loads(payload)
86
    return unload
87

    
88

    
89
class Client:
90
  """RPC Client class.
91

92
  This class, given a (remote) method name, a list of parameters and a
93
  list of nodes, will contact (in parallel) all nodes, and return a
94
  dict of results (key: node name, value: result).
95

96
  One current bug is that generic failure is still signalled by
97
  'False' result, which is not good. This overloading of values can
98
  cause bugs.
99

100
  """
101
  result_set = False
102
  result = False
103
  allresult = []
104

    
105
  def __init__(self, procedure, args):
106
    ss = ssconf.SimpleStore()
107
    self.port = ss.GetNodeDaemonPort()
108
    self.nodepw = ss.GetNodeDaemonPassword()
109
    self.nc = {}
110
    self.results = {}
111
    self.procedure = procedure
112
    self.args = args
113
    self.body = simplejson.dumps(args)
114

    
115
  #--- generic connector -------------
116

    
117
  def connect_list(self, node_list):
118
    """Add a list of nodes to the target nodes.
119

120
    """
121
    for node in node_list:
122
      self.connect(node)
123

    
124
  def connect(self, connect_node):
125
    """Add a node to the target list.
126

127
    """
128
    self.nc[connect_node] = nc = NodeController(self, connect_node)
129

    
130
  def getresult(self):
131
    """Return the results of the call.
132

133
    """
134
    return self.results
135

    
136
  def run(self):
137
    """Wrapper over reactor.run().
138

139
    This function simply calls reactor.run() if we have any requests
140
    queued, otherwise it does nothing.
141

142
    """
143
    for node, nc in self.nc.items():
144
      self.results[node] = nc.get_response()
145

    
146

    
147
def call_volume_list(node_list, vg_name):
148
  """Gets the logical volumes present in a given volume group.
149

150
  This is a multi-node call.
151

152
  """
153
  c = Client("volume_list", [vg_name])
154
  c.connect_list(node_list)
155
  c.run()
156
  return c.getresult()
157

    
158

    
159
def call_vg_list(node_list):
160
  """Gets the volume group list.
161

162
  This is a multi-node call.
163

164
  """
165
  c = Client("vg_list", [])
166
  c.connect_list(node_list)
167
  c.run()
168
  return c.getresult()
169

    
170

    
171
def call_bridges_exist(node, bridges_list):
172
  """Checks if a node has all the bridges given.
173

174
  This method checks if all bridges given in the bridges_list are
175
  present on the remote node, so that an instance that uses interfaces
176
  on those bridges can be started.
177

178
  This is a single-node call.
179

180
  """
181
  c = Client("bridges_exist", [bridges_list])
182
  c.connect(node)
183
  c.run()
184
  return c.getresult().get(node, False)
185

    
186

    
187
def call_instance_start(node, instance, extra_args):
188
  """Starts an instance.
189

190
  This is a single-node call.
191

192
  """
193
  c = Client("instance_start", [instance.ToDict(), extra_args])
194
  c.connect(node)
195
  c.run()
196
  return c.getresult().get(node, False)
197

    
198

    
199
def call_instance_shutdown(node, instance):
200
  """Stops an instance.
201

202
  This is a single-node call.
203

204
  """
205
  c = Client("instance_shutdown", [instance.ToDict()])
206
  c.connect(node)
207
  c.run()
208
  return c.getresult().get(node, False)
209

    
210

    
211
def call_instance_migrate(node, instance, target, live):
212
  """Migrate an instance.
213

214
  This is a single-node call.
215

216
  """
217
  c = Client("instance_migrate", [instance.name, target, live])
218
  c.connect(node)
219
  c.run()
220
  return c.getresult().get(node, False)
221

    
222

    
223
def call_instance_reboot(node, instance, reboot_type, extra_args):
224
  """Reboots an instance.
225

226
  This is a single-node call.
227

228
  """
229
  c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
230
  c.connect(node)
231
  c.run()
232
  return c.getresult().get(node, False)
233

    
234

    
235
def call_instance_os_add(node, inst, osdev, swapdev):
236
  """Installs an OS on the given instance.
237

238
  This is a single-node call.
239

240
  """
241
  params = [inst.ToDict(), osdev, swapdev]
242
  c = Client("instance_os_add", params)
243
  c.connect(node)
244
  c.run()
245
  return c.getresult().get(node, False)
246

    
247

    
248
def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
249
  """Run the OS rename script for an instance.
250

251
  This is a single-node call.
252

253
  """
254
  params = [inst.ToDict(), old_name, osdev, swapdev]
255
  c = Client("instance_run_rename", params)
256
  c.connect(node)
257
  c.run()
258
  return c.getresult().get(node, False)
259

    
260

    
261
def call_instance_info(node, instance):
262
  """Returns information about a single instance.
263

264
  This is a single-node call.
265

266
  """
267
  c = Client("instance_info", [instance])
268
  c.connect(node)
269
  c.run()
270
  return c.getresult().get(node, False)
271

    
272

    
273
def call_all_instances_info(node_list):
274
  """Returns information about all instances on a given node.
275

276
  This is a single-node call.
277

278
  """
279
  c = Client("all_instances_info", [])
280
  c.connect_list(node_list)
281
  c.run()
282
  return c.getresult()
283

    
284

    
285
def call_instance_list(node_list):
286
  """Returns the list of running instances on a given node.
287

288
  This is a single-node call.
289

290
  """
291
  c = Client("instance_list", [])
292
  c.connect_list(node_list)
293
  c.run()
294
  return c.getresult()
295

    
296

    
297
def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
298
  """Do a TcpPing on the remote node
299

300
  This is a single-node call.
301
  """
302
  c = Client("node_tcp_ping", [source, target, port, timeout,
303
                               live_port_needed])
304
  c.connect(node)
305
  c.run()
306
  return c.getresult().get(node, False)
307

    
308

    
309
def call_node_info(node_list, vg_name):
310
  """Return node information.
311

312
  This will return memory information and volume group size and free
313
  space.
314

315
  This is a multi-node call.
316

317
  """
318
  c = Client("node_info", [vg_name])
319
  c.connect_list(node_list)
320
  c.run()
321
  retux = c.getresult()
322

    
323
  for node_name in retux:
324
    ret = retux.get(node_name, False)
325
    if type(ret) != dict:
326
      logger.Error("could not connect to node %s" % (node_name))
327
      ret = {}
328

    
329
    utils.CheckDict(ret,
330
                    { 'memory_total' : '-',
331
                      'memory_dom0' : '-',
332
                      'memory_free' : '-',
333
                      'vg_size' : 'node_unreachable',
334
                      'vg_free' : '-' },
335
                    "call_node_info",
336
                    )
337
  return retux
338

    
339

    
340
def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
341
  """Add a node to the cluster.
342

343
  This is a single-node call.
344

345
  """
346
  params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
347
  c = Client("node_add", params)
348
  c.connect(node)
349
  c.run()
350
  return c.getresult().get(node, False)
351

    
352

    
353
def call_node_verify(node_list, checkdict):
354
  """Request verification of given parameters.
355

356
  This is a multi-node call.
357

358
  """
359
  c = Client("node_verify", [checkdict])
360
  c.connect_list(node_list)
361
  c.run()
362
  return c.getresult()
363

    
364

    
365
def call_node_start_master(node):
366
  """Tells a node to activate itself as a master.
367

368
  This is a single-node call.
369

370
  """
371
  c = Client("node_start_master", [])
372
  c.connect(node)
373
  c.run()
374
  return c.getresult().get(node, False)
375

    
376

    
377
def call_node_stop_master(node):
378
  """Tells a node to demote itself from master status.
379

380
  This is a single-node call.
381

382
  """
383
  c = Client("node_stop_master", [])
384
  c.connect(node)
385
  c.run()
386
  return c.getresult().get(node, False)
387

    
388

    
389
def call_version(node_list):
390
  """Query node version.
391

392
  This is a multi-node call.
393

394
  """
395
  c = Client("version", [])
396
  c.connect_list(node_list)
397
  c.run()
398
  return c.getresult()
399

    
400

    
401
def call_blockdev_create(node, bdev, size, owner, on_primary, info):
402
  """Request creation of a given block device.
403

404
  This is a single-node call.
405

406
  """
407
  params = [bdev.ToDict(), size, owner, on_primary, info]
408
  c = Client("blockdev_create", params)
409
  c.connect(node)
410
  c.run()
411
  return c.getresult().get(node, False)
412

    
413

    
414
def call_blockdev_remove(node, bdev):
415
  """Request removal of a given block device.
416

417
  This is a single-node call.
418

419
  """
420
  c = Client("blockdev_remove", [bdev.ToDict()])
421
  c.connect(node)
422
  c.run()
423
  return c.getresult().get(node, False)
424

    
425

    
426
def call_blockdev_rename(node, devlist):
427
  """Request rename of the given block devices.
428

429
  This is a single-node call.
430

431
  """
432
  params = [(d.ToDict(), uid) for d, uid in devlist]
433
  c = Client("blockdev_rename", params)
434
  c.connect(node)
435
  c.run()
436
  return c.getresult().get(node, False)
437

    
438

    
439
def call_blockdev_assemble(node, disk, owner, on_primary):
440
  """Request assembling of a given block device.
441

442
  This is a single-node call.
443

444
  """
445
  params = [disk.ToDict(), owner, on_primary]
446
  c = Client("blockdev_assemble", params)
447
  c.connect(node)
448
  c.run()
449
  return c.getresult().get(node, False)
450

    
451

    
452
def call_blockdev_shutdown(node, disk):
453
  """Request shutdown of a given block device.
454

455
  This is a single-node call.
456

457
  """
458
  c = Client("blockdev_shutdown", [disk.ToDict()])
459
  c.connect(node)
460
  c.run()
461
  return c.getresult().get(node, False)
462

    
463

    
464
def call_blockdev_addchildren(node, bdev, ndevs):
465
  """Request adding a list of children to a (mirroring) device.
466

467
  This is a single-node call.
468

469
  """
470
  params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
471
  c = Client("blockdev_addchildren", params)
472
  c.connect(node)
473
  c.run()
474
  return c.getresult().get(node, False)
475

    
476

    
477
def call_blockdev_removechildren(node, bdev, ndevs):
478
  """Request removing a list of children from a (mirroring) device.
479

480
  This is a single-node call.
481

482
  """
483
  params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
484
  c = Client("blockdev_removechildren", params)
485
  c.connect(node)
486
  c.run()
487
  return c.getresult().get(node, False)
488

    
489

    
490
def call_blockdev_getmirrorstatus(node, disks):
491
  """Request status of a (mirroring) device.
492

493
  This is a single-node call.
494

495
  """
496
  params = [dsk.ToDict() for dsk in disks]
497
  c = Client("blockdev_getmirrorstatus", params)
498
  c.connect(node)
499
  c.run()
500
  return c.getresult().get(node, False)
501

    
502

    
503
def call_blockdev_find(node, disk):
504
  """Request identification of a given block device.
505

506
  This is a single-node call.
507

508
  """
509
  c = Client("blockdev_find", [disk.ToDict()])
510
  c.connect(node)
511
  c.run()
512
  return c.getresult().get(node, False)
513

    
514

    
515
def call_blockdev_close(node, disks):
516
  """Closes the given block devices.
517

518
  This is a single-node call.
519

520
  """
521
  params = [cf.ToDict() for cf in disks]
522
  c = Client("blockdev_close", params)
523
  c.connect(node)
524
  c.run()
525
  return c.getresult().get(node, False)
526

    
527

    
528
def call_upload_file(node_list, file_name):
529
  """Upload a file.
530

531
  The node will refuse the operation in case the file is not on the
532
  approved file list.
533

534
  This is a multi-node call.
535

536
  """
537
  fh = file(file_name)
538
  try:
539
    data = fh.read()
540
  finally:
541
    fh.close()
542
  st = os.stat(file_name)
543
  params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
544
            st.st_atime, st.st_mtime]
545
  c = Client("upload_file", params)
546
  c.connect_list(node_list)
547
  c.run()
548
  return c.getresult()
549

    
550

    
551
def call_os_diagnose(node_list):
552
  """Request a diagnose of OS definitions.
553

554
  This is a multi-node call.
555

556
  """
557
  c = Client("os_diagnose", [])
558
  c.connect_list(node_list)
559
  c.run()
560
  result = c.getresult()
561
  new_result = {}
562
  for node_name in result:
563
    if result[node_name]:
564
      nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
565
    else:
566
      nr = []
567
    new_result[node_name] = nr
568
  return new_result
569

    
570

    
571
def call_os_get(node, name):
572
  """Returns an OS definition.
573

574
  This is a single-node call.
575

576
  """
577
  c = Client("os_get", [name])
578
  c.connect(node)
579
  c.run()
580
  result = c.getresult().get(node, False)
581
  if isinstance(result, dict):
582
    return objects.OS.FromDict(result)
583
  else:
584
    return result
585

    
586

    
587
def call_hooks_runner(node_list, hpath, phase, env):
588
  """Call the hooks runner.
589

590
  Args:
591
    - op: the OpCode instance
592
    - env: a dictionary with the environment
593

594
  This is a multi-node call.
595

596
  """
597
  params = [hpath, phase, env]
598
  c = Client("hooks_runner", params)
599
  c.connect_list(node_list)
600
  c.run()
601
  result = c.getresult()
602
  return result
603

    
604

    
605
def call_iallocator_runner(node, name, idata):
606
  """Call an iallocator on a remote node
607

608
  Args:
609
    - name: the iallocator name
610
    - input: the json-encoded input string
611

612
  This is a single-node call.
613

614
  """
615
  params = [name, idata]
616
  c = Client("iallocator_runner", params)
617
  c.connect(node)
618
  c.run()
619
  result = c.getresult().get(node, False)
620
  return result
621

    
622

    
623
def call_blockdev_grow(node, cf_bdev, amount):
624
  """Request a snapshot of the given block device.
625

626
  This is a single-node call.
627

628
  """
629
  c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
630
  c.connect(node)
631
  c.run()
632
  return c.getresult().get(node, False)
633

    
634

    
635
def call_blockdev_snapshot(node, cf_bdev):
636
  """Request a snapshot of the given block device.
637

638
  This is a single-node call.
639

640
  """
641
  c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
642
  c.connect(node)
643
  c.run()
644
  return c.getresult().get(node, False)
645

    
646

    
647
def call_snapshot_export(node, snap_bdev, dest_node, instance):
648
  """Request the export of a given snapshot.
649

650
  This is a single-node call.
651

652
  """
653
  params = [snap_bdev.ToDict(), dest_node, instance.ToDict()]
654
  c = Client("snapshot_export", params)
655
  c.connect(node)
656
  c.run()
657
  return c.getresult().get(node, False)
658

    
659

    
660
def call_finalize_export(node, instance, snap_disks):
661
  """Request the completion of an export operation.
662

663
  This writes the export config file, etc.
664

665
  This is a single-node call.
666

667
  """
668
  flat_disks = []
669
  for disk in snap_disks:
670
    flat_disks.append(disk.ToDict())
671
  params = [instance.ToDict(), flat_disks]
672
  c = Client("finalize_export", params)
673
  c.connect(node)
674
  c.run()
675
  return c.getresult().get(node, False)
676

    
677

    
678
def call_export_info(node, path):
679
  """Queries the export information in a given path.
680

681
  This is a single-node call.
682

683
  """
684
  c = Client("export_info", [path])
685
  c.connect(node)
686
  c.run()
687
  result = c.getresult().get(node, False)
688
  if not result:
689
    return result
690
  return objects.SerializableConfigParser.Loads(str(result))
691

    
692

    
693
def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
694
  """Request the import of a backup into an instance.
695

696
  This is a single-node call.
697

698
  """
699
  params = [inst.ToDict(), osdev, swapdev, src_node, src_image]
700
  c = Client("instance_os_import", params)
701
  c.connect(node)
702
  c.run()
703
  return c.getresult().get(node, False)
704

    
705

    
706
def call_export_list(node_list):
707
  """Gets the stored exports list.
708

709
  This is a multi-node call.
710

711
  """
712
  c = Client("export_list", [])
713
  c.connect_list(node_list)
714
  c.run()
715
  result = c.getresult()
716
  return result
717

    
718

    
719
def call_export_remove(node, export):
720
  """Requests removal of a given export.
721

722
  This is a single-node call.
723

724
  """
725
  c = Client("export_remove", [export])
726
  c.connect(node)
727
  c.run()
728
  return c.getresult().get(node, False)
729

    
730

    
731
def call_node_leave_cluster(node):
732
  """Requests a node to clean the cluster information it has.
733

734
  This will remove the configuration information from the ganeti data
735
  dir.
736

737
  This is a single-node call.
738

739
  """
740
  c = Client("node_leave_cluster", [])
741
  c.connect(node)
742
  c.run()
743
  return c.getresult().get(node, False)
744

    
745

    
746
def call_node_volumes(node_list):
747
  """Gets all volumes on node(s).
748

749
  This is a multi-node call.
750

751
  """
752
  c = Client("node_volumes", [])
753
  c.connect_list(node_list)
754
  c.run()
755
  return c.getresult()
756

    
757

    
758
def call_test_delay(node_list, duration):
759
  """Sleep for a fixed time on given node(s).
760

761
  This is a multi-node call.
762

763
  """
764
  c = Client("test_delay", [duration])
765
  c.connect_list(node_list)
766
  c.run()
767
  return c.getresult()
768

    
769

    
770
def call_file_storage_dir_create(node, file_storage_dir):
771
  """Create the given file storage directory.
772

773
  This is a single-node call.
774

775
  """
776
  c = Client("file_storage_dir_create", [file_storage_dir])
777
  c.connect(node)
778
  c.run()
779
  return c.getresult().get(node, False)
780

    
781

    
782
def call_file_storage_dir_remove(node, file_storage_dir):
783
  """Remove the given file storage directory.
784

785
  This is a single-node call.
786

787
  """
788
  c = Client("file_storage_dir_remove", [file_storage_dir])
789
  c.connect(node)
790
  c.run()
791
  return c.getresult().get(node, False)
792

    
793

    
794
def call_file_storage_dir_rename(node, old_file_storage_dir,
795
                                 new_file_storage_dir):
796
  """Rename file storage directory.
797

798
  This is a single-node call.
799

800
  """
801
  c = Client("file_storage_dir_rename",
802
             [old_file_storage_dir, new_file_storage_dir])
803
  c.connect(node)
804
  c.run()
805
  return c.getresult().get(node, False)
806