Statistics
| Branch: | Tag: | Revision:

root / lib / rapi / client.py @ cab667cc

History | View | Annotate | Download (23.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti RAPI client."""
23

    
24
import httplib
25
import httplib2
26
import simplejson
27
import socket
28
import urllib
29
from OpenSSL import SSL
30
from OpenSSL import crypto
31

    
32

    
33
HTTP_DELETE = "DELETE"
34
HTTP_GET = "GET"
35
HTTP_PUT = "PUT"
36
HTTP_POST = "POST"
37
REPLACE_DISK_PRI = "replace_on_primary"
38
REPLACE_DISK_SECONDARY = "replace_on_secondary"
39
REPLACE_DISK_CHG = "replace_new_secondary"
40
REPLACE_DISK_AUTO = "replace_auto"
41
VALID_REPLACEMENT_MODES = frozenset([
42
    REPLACE_DISK_PRI, REPLACE_DISK_SECONDARY, REPLACE_DISK_CHG,
43
    REPLACE_DISK_AUTO
44
    ])
45
VALID_NODE_ROLES = frozenset([
46
    "drained", "master", "master-candidate", "offline", "regular"
47
    ])
48
VALID_STORAGE_TYPES = frozenset(["file", "lvm-pv", "lvm-vg"])
49

    
50

    
51
class Error(Exception):
52
  """Base error class for this module.
53

54
  """
55
  pass
56

    
57

    
58
class CertificateError(Error):
59
  """Raised when a problem is found with the SSL certificate.
60

61
  """
62
  pass
63

    
64

    
65
class GanetiApiError(Error):
66
  """Generic error raised from Ganeti API.
67

68
  """
69
  pass
70

    
71

    
72
class InvalidReplacementMode(Error):
73
  """Raised when an invalid disk replacement mode is attempted.
74

75
  """
76
  pass
77

    
78

    
79
class InvalidStorageType(Error):
80
  """Raised when an invalid storage type is used.
81

82
  """
83
  pass
84

    
85

    
86
class InvalidNodeRole(Error):
87
  """Raised when an invalid node role is used.
88

89
  """
90
  pass
91

    
92

    
93
class GanetiRapiClient(object):
94
  """Ganeti RAPI client.
95

96
  """
97

    
98
  USER_AGENT = "Ganeti RAPI Client"
99

    
100
  def __init__(self, master_hostname, port=5080, username=None, password=None,
101
               ssl_cert=None):
102
    """Constructor.
103

104
    @type master_hostname: str
105
    @param master_hostname: the ganeti cluster master to interact with
106
    @type port: int
107
    @param port: the port on which the RAPI is running. (default is 5080)
108
    @type username: str
109
    @param username: the username to connect with
110
    @type password: str
111
    @param password: the password to connect with
112
    @type ssl_cert: str or None
113
    @param ssl_cert: the expected SSL certificate. if None, SSL certificate
114
        will not be verified
115

116
    """
117
    self._master_hostname = master_hostname
118
    self._port = port
119
    if ssl_cert:
120
      _VerifyCertificate(self._master_hostname, self._port, ssl_cert)
121

    
122
    self._http = httplib2.Http()
123
    self._headers = {
124
        "Accept": "text/plain",
125
        "Content-type": "application/x-www-form-urlencoded",
126
        "User-Agent": self.USER_AGENT}
127
    self._version = None
128
    if username and password:
129
      self._http.add_credentials(username, password)
130

    
131
  def _MakeUrl(self, path, query=None, prepend_version=True):
132
    """Constructs the URL to pass to the HTTP client.
133

134
    @type path: str
135
    @param path: HTTP URL path
136
    @type query: list of two-tuples
137
    @param query: query arguments to pass to urllib.urlencode
138
    @type prepend_version: bool
139
    @param prepend_version: whether to automatically fetch and prepend the
140
        Ganeti RAPI version to the URL path
141

142
    @rtype:  str
143
    @return: URL path
144

145
    """
146
    if prepend_version:
147
      if not self._version:
148
        self._GetVersionInternal()
149
      path = "/%d%s" % (self._version, path)
150

    
151
    return "https://%(host)s:%(port)d%(path)s?%(query)s" % {
152
        "host": self._master_hostname,
153
        "port": self._port,
154
        "path": path,
155
        "query": urllib.urlencode(query or [])}
156

    
157
  def _SendRequest(self, method, path, query=None, content=None,
158
                   prepend_version=True):
159
    """Sends an HTTP request.
160

161
    This constructs a full URL, encodes and decodes HTTP bodies, and
162
    handles invalid responses in a pythonic way.
163

164
    @type method: str
165
    @param method: HTTP method to use
166
    @type path: str
167
    @param path: HTTP URL path
168
    @type query: list of two-tuples
169
    @param query: query arguments to pass to urllib.urlencode
170
    @type content: str or None
171
    @param content: HTTP body content
172
    @type prepend_version: bool
173
    @param prepend_version: whether to automatically fetch and prepend the
174
        Ganeti RAPI version to the URL path
175

176
    @rtype: str
177
    @return: JSON-Decoded response
178

179
    @raises GanetiApiError: If an invalid response is returned
180

181
    """
182
    if content:
183
      simplejson.JSONEncoder(sort_keys=True).encode(content)
184

    
185
    url = self._MakeUrl(path, query, prepend_version)
186
    resp_headers, resp_content = self._http.request(
187
        url, method, body=content, headers=self._headers)
188

    
189
    if resp_content:
190
      resp_content = simplejson.loads(resp_content)
191

    
192
    # TODO: Are there other status codes that are valid? (redirect?)
193
    if resp_headers.status != 200:
194
      if isinstance(resp_content, dict):
195
        msg = ("%s %s: %s" %
196
            (resp_content["code"], resp_content["message"],
197
             resp_content["explain"]))
198
      else:
199
        msg = resp_content
200
      raise GanetiApiError(msg)
201

    
202
    return resp_content
203

    
204
  def _GetVersionInternal(self):
205
    """Gets the Remote API version running on the cluster.
206

207
    @rtype: int
208
    @return: Ganeti version
209

210
    """
211
    self._version = self._SendRequest(HTTP_GET, "/version",
212
                                      prepend_version=False)
213
    return self._version
214

    
215
  def GetVersion(self):
216
    """Gets the Remote API version running on the cluster.
217

218
    @rtype: int
219
    @return: Ganeti version
220

221
    """
222
    if not self._version:
223
      self._GetVersionInternal()
224
    return self._version
225

    
226
  def GetOperatingSystems(self):
227
    """Gets the Operating Systems running in the Ganeti cluster.
228

229
    @rtype: list of str
230
    @return: operating systems
231

232
    """
233
    return self._SendRequest(HTTP_GET, "/os")
234

    
235
  def GetInfo(self):
236
    """Gets info about the cluster.
237

238
    @rtype: dict
239
    @return: information about the cluster
240

241
    """
242
    return self._SendRequest(HTTP_GET, "/info")
243

    
244
  def GetClusterTags(self):
245
    """Gets the cluster tags.
246

247
    @rtype: list of str
248
    @return: cluster tags
249

250
    """
251
    return self._SendRequest(HTTP_GET, "/tags")
252

    
253
  def AddClusterTags(self, tags, dry_run=False):
254
    """Adds tags to the cluster.
255

256
    @type tags: list of str
257
    @param tags: tags to add to the cluster
258
    @type dry_run: bool
259
    @param dry_run: whether to perform a dry run
260

261
    @rtype: int
262
    @return: job id
263

264
    """
265
    query = [("tag", t) for t in tags]
266
    if dry_run:
267
      query.append(("dry-run", 1))
268

    
269
    return self._SendRequest(HTTP_PUT, "/tags", query)
270

    
271
  def DeleteClusterTags(self, tags, dry_run=False):
272
    """Deletes tags from the cluster.
273

274
    @type tags: list of str
275
    @param tags: tags to delete
276
    @type dry_run: bool
277
    @param dry_run: whether to perform a dry run
278

279
    """
280
    query = [("tag", t) for t in tags]
281
    if dry_run:
282
      query.append(("dry-run", 1))
283

    
284
    self._SendRequest(HTTP_DELETE, "/tags", query)
285

    
286
  def GetInstances(self, bulk=False):
287
    """Gets information about instances on the cluster.
288

289
    @type bulk: bool
290
    @param bulk: whether to return all information about all instances
291

292
    @rtype: list of dict or list of str
293
    @return: if bulk is True, info about the instances, else a list of instances
294

295
    """
296
    query = []
297
    if bulk:
298
      query.append(("bulk", 1))
299

    
300
    instances = self._SendRequest(HTTP_GET, "/instances", query)
301
    if bulk:
302
      return instances
303
    else:
304
      return [i["id"] for i in instances]
305

    
306

    
307
  def GetInstanceInfo(self, instance):
308
    """Gets information about an instance.
309

310
    @type instance: str
311
    @param instance: instance whose info to return
312

313
    @rtype: dict
314
    @return: info about the instance
315

316
    """
317
    return self._SendRequest(HTTP_GET, "/instances/%s" % instance)
318

    
319
  def CreateInstance(self, dry_run=False):
320
    """Creates a new instance.
321

322
    @type dry_run: bool
323
    @param dry_run: whether to perform a dry run
324

325
    @rtype: int
326
    @return: job id
327

328
    """
329
    # TODO: Pass arguments needed to actually create an instance.
330
    query = []
331
    if dry_run:
332
      query.append(("dry-run", 1))
333

    
334
    return self._SendRequest(HTTP_POST, "/instances", query)
335

    
336
  def DeleteInstance(self, instance, dry_run=False):
337
    """Deletes an instance.
338

339
    @type instance: str
340
    @param instance: the instance to delete
341

342
    @rtype: int
343
    @return: job id
344

345
    """
346
    query = []
347
    if dry_run:
348
      query.append(("dry-run", 1))
349

    
350
    return self._SendRequest(HTTP_DELETE, "/instances/%s" % instance, query)
351

    
352
  def GetInstanceTags(self, instance):
353
    """Gets tags for an instance.
354

355
    @type instance: str
356
    @param instance: instance whose tags to return
357

358
    @rtype: list of str
359
    @return: tags for the instance
360

361
    """
362
    return self._SendRequest(HTTP_GET, "/instances/%s/tags" % instance)
363

    
364
  def AddInstanceTags(self, instance, tags, dry_run=False):
365
    """Adds tags to an instance.
366

367
    @type instance: str
368
    @param instance: instance to add tags to
369
    @type tags: list of str
370
    @param tags: tags to add to the instance
371
    @type dry_run: bool
372
    @param dry_run: whether to perform a dry run
373

374
    @rtype: int
375
    @return: job id
376

377
    """
378
    query = [("tag", t) for t in tags]
379
    if dry_run:
380
      query.append(("dry-run", 1))
381

    
382
    return self._SendRequest(HTTP_PUT, "/instances/%s/tags" % instance, query)
383

    
384
  def DeleteInstanceTags(self, instance, tags, dry_run=False):
385
    """Deletes tags from an instance.
386

387
    @type instance: str
388
    @param instance: instance to delete tags from
389
    @type tags: list of str
390
    @param tags: tags to delete
391
    @type dry_run: bool
392
    @param dry_run: whether to perform a dry run
393

394
    """
395
    query = [("tag", t) for t in tags]
396
    if dry_run:
397
      query.append(("dry-run", 1))
398

    
399
    self._SendRequest(HTTP_DELETE, "/instances/%s/tags" % instance, query)
400

    
401
  def RebootInstance(self, instance, reboot_type=None, ignore_secondaries=None,
402
                     dry_run=False):
403
    """Reboots an instance.
404

405
    @type instance: str
406
    @param instance: instance to rebot
407
    @type reboot_type: str
408
    @param reboot_type: one of: hard, soft, full
409
    @type ignore_secondaries: bool
410
    @param ignore_secondaries: if True, ignores errors for the secondary node
411
        while re-assembling disks (in hard-reboot mode only)
412
    @type dry_run: bool
413
    @param dry_run: whether to perform a dry run
414

415
    """
416
    query = []
417
    if reboot_type:
418
      query.append(("type", reboot_type))
419
    if ignore_secondaries is not None:
420
      query.append(("ignore_secondaries", ignore_secondaries))
421
    if dry_run:
422
      query.append(("dry-run", 1))
423

    
424
    self._SendRequest(HTTP_POST, "/instances/%s/reboot" % instance, query)
425

    
426
  def ShutdownInstance(self, instance, dry_run=False):
427
    """Shuts down an instance.
428

429
    @type instance: str
430
    @param instance: the instance to shut down
431
    @type dry_run: bool
432
    @param dry_run: whether to perform a dry run
433

434
    """
435
    query = []
436
    if dry_run:
437
      query.append(("dry-run", 1))
438

    
439
    self._SendRequest(HTTP_PUT, "/instances/%s/shutdown" % instance, query)
440

    
441
  def StartupInstance(self, instance, dry_run=False):
442
    """Starts up an instance.
443

444
    @type instance: str
445
    @param instance: the instance to start up
446
    @type dry_run: bool
447
    @param dry_run: whether to perform a dry run
448

449
    """
450
    query = []
451
    if dry_run:
452
      query.append(("dry-run", 1))
453

    
454
    self._SendRequest(HTTP_PUT, "/instances/%s/startup" % instance, query)
455

    
456
  def ReinstallInstance(self, instance, os, no_startup=False):
457
    """Reinstalls an instance.
458

459
    @type instance: str
460
    @param instance: the instance to reinstall
461
    @type os: str
462
    @param os: the os to reinstall
463
    @type no_startup: bool
464
    @param no_startup: whether to start the instance automatically
465

466
    """
467
    query = [("os", os)]
468
    if no_startup:
469
      query.append(("nostartup", 1))
470
    self._SendRequest(HTTP_POST, "/instances/%s/reinstall" % instance, query)
471

    
472
  def ReplaceInstanceDisks(self, instance, disks, mode="replace_auto",
473
                           remote_node=None, iallocator="hail", dry_run=False):
474
    """Replaces disks on an instance.
475

476
    @type instance: str
477
    @param instance: instance whose disks to replace
478
    @type disks: list of str
479
    @param disks: disks to replace
480
    @type mode: str
481
    @param mode: replacement mode to use. defaults to replace_auto
482
    @type remote_node: str or None
483
    @param remote_node: new secondary node to use (for use with
484
        replace_new_secondary mdoe)
485
    @type iallocator: str or None
486
    @param iallocator: instance allocator plugin to use (for use with
487
        replace_auto mdoe).  default is hail
488
    @type dry_run: bool
489
    @param dry_run: whether to perform a dry run
490

491
    @rtype: int
492
    @return: job id
493

494
    @raises InvalidReplacementMode: If an invalid disk replacement mode is given
495
    @raises GanetiApiError: If no secondary node is given with a non-auto
496
        replacement mode is requested.
497

498
    """
499
    if mode not in VALID_REPLACEMENT_MODES:
500
      raise InvalidReplacementMode("%s is not a valid disk replacement mode.",
501
                                   mode)
502

    
503
    query = [("mode", mode), ("disks", ",".join(disks))]
504

    
505
    if mode is REPLACE_DISK_AUTO:
506
      query.append(("iallocator", iallocator))
507
    elif mode is REPLACE_DISK_SECONDARY:
508
      if remote_node is None:
509
        raise GanetiApiError("You must supply a new secondary node.")
510
      query.append(("remote_node", remote_node))
511

    
512
    if dry_run:
513
      query.append(("dry-run", 1))
514

    
515
    return self._SendRequest(HTTP_POST,
516
                             "/instances/%s/replace-disks" % instance, query)
517

    
518
  def GetJobs(self):
519
    """Gets all jobs for the cluster.
520

521
    @rtype: list of int
522
    @return: job ids for the cluster
523

524
    """
525
    return [int(j["id"]) for j in self._SendRequest(HTTP_GET, "/jobs")]
526

    
527
  def GetJobStatus(self, job_id):
528
    """Gets the status of a job.
529

530
    @type job_id: int
531
    @param job_id: job id whose status to query
532

533
    @rtype: dict
534
    @return: job status
535

536
    """
537
    return self._SendRequest(HTTP_GET, "/jobs/%d" % job_id)
538

    
539
  def DeleteJob(self, job_id, dry_run=False):
540
    """Deletes a job.
541

542
    @type job_id: int
543
    @param job_id: id of the job to delete
544
    @type dry_run: bool
545
    @param dry_run: whether to perform a dry run
546

547
    """
548
    query = []
549
    if dry_run:
550
      query.append(("dry-run", 1))
551

    
552
    self._SendRequest(HTTP_DELETE, "/jobs/%d" % job_id, query)
553

    
554
  def GetNodes(self, bulk=False):
555
    """Gets all nodes in the cluster.
556

557
    @type bulk: bool
558
    @param bulk: whether to return all information about all instances
559

560
    @rtype: list of dict or str
561
    @return: if bulk is true, info about nodes in the cluster,
562
        else list of nodes in the cluster
563

564
    """
565
    query = []
566
    if bulk:
567
      query.append(("bulk", 1))
568

    
569
    nodes = self._SendRequest(HTTP_GET, "/nodes", query)
570
    if bulk:
571
      return nodes
572
    else:
573
      return [n["id"] for n in nodes]
574

    
575
  def GetNodeInfo(self, node):
576
    """Gets information about a node.
577

578
    @type node: str
579
    @param node: node whose info to return
580

581
    @rtype: dict
582
    @return: info about the node
583

584
    """
585
    return self._SendRequest(HTTP_GET, "/nodes/%s" % node)
586

    
587
  def EvacuateNode(self, node, iallocator=None, remote_node=None,
588
                   dry_run=False):
589
    """Evacuates instances from a Ganeti node.
590

591
    @type node: str
592
    @param node: node to evacuate
593
    @type iallocator: str or None
594
    @param iallocator: instance allocator to use
595
    @type remote_node: str
596
    @param remote_node: node to evaucate to
597
    @type dry_run: bool
598
    @param dry_run: whether to perform a dry run
599

600
    @rtype: int
601
    @return: job id
602

603
    @raises GanetiApiError: if an iallocator and remote_node are both specified
604

605
    """
606
    query = []
607
    if iallocator and remote_node:
608
      raise GanetiApiError("Only one of iallocator or remote_node can be used.")
609

    
610
    if iallocator:
611
      query.append(("iallocator", iallocator))
612
    if remote_node:
613
      query.append(("remote_node", remote_node))
614
    if dry_run:
615
      query.append(("dry-run", 1))
616

    
617
    return self._SendRequest(HTTP_POST, "/nodes/%s/evacuate" % node, query)
618

    
619
  def MigrateNode(self, node, live=True, dry_run=False):
620
    """Migrates all primary instances from a node.
621

622
    @type node: str
623
    @param node: node to migrate
624
    @type live: bool
625
    @param live: whether to use live migration
626
    @type dry_run: bool
627
    @param dry_run: whether to perform a dry run
628

629
    @rtype: int
630
    @return: job id
631

632
    """
633
    query = []
634
    if live:
635
      query.append(("live", 1))
636
    if dry_run:
637
      query.append(("dry-run", 1))
638

    
639
    return self._SendRequest(HTTP_POST, "/nodes/%s/migrate" % node, query)
640

    
641
  def GetNodeRole(self, node):
642
    """Gets the current role for a node.
643

644
    @type node: str
645
    @param node: node whose role to return
646

647
    @rtype: str
648
    @return: the current role for a node
649

650
    """
651
    return self._SendRequest(HTTP_GET, "/nodes/%s/role" % node)
652

    
653
  def SetNodeRole(self, node, role, force=False):
654
    """Sets the role for a node.
655

656
    @type node: str
657
    @param node: the node whose role to set
658
    @type role: str
659
    @param role: the role to set for the node
660
    @type force: bool
661
    @param force: whether to force the role change
662

663
    @rtype: int
664
    @return: job id
665

666
    @raise InvalidNodeRole: If an invalid node role is specified
667

668
    """
669
    if role not in VALID_NODE_ROLES:
670
      raise InvalidNodeRole("%s is not a valid node role.", role)
671

    
672
    query = [("force", force)]
673
    return self._SendRequest(HTTP_PUT, "/nodes/%s/role" % node, query,
674
                             content=role)
675

    
676
  def GetNodeStorageUnits(self, node, storage_type, output_fields):
677
    """Gets the storage units for a node.
678

679
    @type node: str
680
    @param node: the node whose storage units to return
681
    @type storage_type: str
682
    @param storage_type: storage type whose units to return
683
    @type output_fields: str
684
    @param output_fields: storage type fields to return
685

686
    @rtype: int
687
    @return: job id where results can be retrieved
688

689
    @raise InvalidStorageType: If an invalid storage type is specified
690

691
    """
692
    # TODO: Add default for storage_type & output_fields
693
    if storage_type not in VALID_STORAGE_TYPES:
694
      raise InvalidStorageType("%s is an invalid storage type.", storage_type)
695

    
696
    query = [("storage_type", storage_type), ("output_fields", output_fields)]
697
    return self._SendRequest(HTTP_GET, "/nodes/%s/storage" % node, query)
698

    
699
  def ModifyNodeStorageUnits(self, node, storage_type, name, allocatable=True):
700
    """Modifies parameters of storage units on the node.
701

702
    @type node: str
703
    @param node: node whose storage units to modify
704
    @type storage_type: str
705
    @param storage_type: storage type whose units to modify
706
    @type name: str
707
    @param name: name of the storage unit
708
    @type allocatable: bool
709
    @param allocatable: TODO: Document me
710

711
    @rtype: int
712
    @return: job id
713

714
    @raise InvalidStorageType: If an invalid storage type is specified
715

716
    """
717
    if storage_type not in VALID_STORAGE_TYPES:
718
      raise InvalidStorageType("%s is an invalid storage type.", storage_type)
719

    
720
    query = [
721
        ("storage_type", storage_type), ("name", name),
722
        ("allocatable", allocatable)
723
        ]
724
    return self._SendRequest(HTTP_PUT, "/nodes/%s/storage/modify" % node, query)
725

    
726
  def RepairNodeStorageUnits(self, node, storage_type, name):
727
    """Repairs a storage unit on the node.
728

729
    @type node: str
730
    @param node: node whose storage units to repair
731
    @type storage_type: str
732
    @param storage_type: storage type to repair
733
    @type name: str
734
    @param name: name of the storage unit to repair
735

736
    @rtype: int
737
    @return: job id
738

739
    @raise InvalidStorageType: If an invalid storage type is specified
740

741
    """
742
    if storage_type not in VALID_STORAGE_TYPES:
743
      raise InvalidStorageType("%s is an invalid storage type.", storage_type)
744

    
745
    query = [("storage_type", storage_type), ("name", name)]
746
    return self._SendRequest(HTTP_PUT, "/nodes/%s/storage/repair" % node, query)
747

    
748
  def GetNodeTags(self, node):
749
    """Gets the tags for a node.
750

751
    @type node: str
752
    @param node: node whose tags to return
753

754
    @rtype: list of str
755
    @return: tags for the node
756

757
    """
758
    return self._SendRequest(HTTP_GET, "/nodes/%s/tags" % node)
759

    
760
  def AddNodeTags(self, node, tags, dry_run=False):
761
    """Adds tags to a node.
762

763
    @type node: str
764
    @param node: node to add tags to
765
    @type tags: list of str
766
    @param tags: tags to add to the node
767
    @type dry_run: bool
768
    @param dry_run: whether to perform a dry run
769

770
    @rtype: int
771
    @return: job id
772

773
    """
774
    query = [("tag", t) for t in tags]
775
    if dry_run:
776
      query.append(("dry-run", 1))
777

    
778
    return self._SendRequest(HTTP_PUT, "/nodes/%s/tags" % node, query,
779
                             content=tags)
780

    
781
  def DeleteNodeTags(self, node, tags, dry_run=False):
782
    """Delete tags from a node.
783

784
    @type node: str
785
    @param node: node to remove tags from
786
    @type tags: list of str
787
    @param tags: tags to remove from the node
788
    @type dry_run: bool
789
    @param dry_run: whether to perform a dry run
790

791
    @rtype: int
792
    @return: job id
793

794
    """
795
    query = [("tag", t) for t in tags]
796
    if dry_run:
797
      query.append(("dry-run", 1))
798

    
799
    return self._SendRequest(HTTP_DELETE, "/nodes/%s/tags" % node, query)
800

    
801

    
802
class HTTPSConnectionOpenSSL(httplib.HTTPSConnection):
803
  """HTTPS Connection handler that verifies the SSL certificate.
804

805
  """
806

    
807
  # pylint: disable-msg=W0142
808
  def __init__(self, *args, **kwargs):
809
    """Constructor.
810

811
    """
812
    httplib.HTTPSConnection.__init__(self, *args, **kwargs)
813

    
814
    self._ssl_cert = None
815
    if self.cert_file:
816
      f = open(self.cert_file, "r")
817
      self._ssl_cert = crypto.load_certificate(crypto.FILETYPE_PEM, f.read())
818
      f.close()
819

    
820
  # pylint: disable-msg=W0613
821
  def _VerifySSLCertCallback(self, conn, cert, errnum, errdepth, ok):
822
    """Verifies the SSL certificate provided by the peer.
823

824
    """
825
    return (self._ssl_cert.digest("sha1") == cert.digest("sha1") and
826
            self._ssl_cert.digest("md5") == cert.digest("md5"))
827

    
828
  def connect(self):
829
    """Connect to the server specified when the object was created.
830

831
    This ensures that SSL certificates are verified.
832

833
    """
834
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
835
    ctx = SSL.Context(SSL.SSLv23_METHOD)
836
    ctx.set_options(SSL.OP_NO_SSLv2)
837
    ctx.use_certificate(self._ssl_cert)
838
    ctx.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
839
                   self._VerifySSLCertCallback)
840

    
841
    ssl = SSL.Connection(ctx, sock)
842
    ssl.connect((self.host, self.port))
843
    self.sock = httplib.FakeSocket(sock, ssl)
844

    
845

    
846
def _VerifyCertificate(hostname, port, cert_file):
847
  """Verifies the SSL certificate for the given host/port.
848

849
  @type hostname: str
850
  @param hostname: the ganeti cluster master whose certificate to verify
851
  @type port: int
852
  @param port: the port on which the RAPI is running
853
  @type cert_file: str
854
  @param cert_file: filename of the expected SSL certificate
855

856
  @raises CertificateError: If an invalid SSL certificate is found
857

858
  """
859
  https = HTTPSConnectionOpenSSL(hostname, port, cert_file=cert_file)
860
  try:
861
    try:
862
      https.request(HTTP_GET, "/version")
863
    except (crypto.Error, SSL.Error):
864
      raise CertificateError("Invalid SSL certificate.")
865
  finally:
866
    https.close()