Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 5349519d

History | View | Annotate | Download (8.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011, 2012, 2013, 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the LUXI protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
from ganeti import constants
33
from ganeti import pathutils
34
from ganeti import objects
35
import ganeti.rpc.client as cl
36
from ganeti.rpc.errors import RequestError
37
from ganeti.rpc.transport import Transport
38

    
39
__all__ = [
40
  # classes:
41
  "Client"
42
  ]
43

    
44
REQ_SUBMIT_JOB = constants.LUXI_REQ_SUBMIT_JOB
45
REQ_SUBMIT_JOB_TO_DRAINED_QUEUE = constants.LUXI_REQ_SUBMIT_JOB_TO_DRAINED_QUEUE
46
REQ_SUBMIT_MANY_JOBS = constants.LUXI_REQ_SUBMIT_MANY_JOBS
47
REQ_PICKUP_JOB = constants.LUXI_REQ_PICKUP_JOB
48
REQ_WAIT_FOR_JOB_CHANGE = constants.LUXI_REQ_WAIT_FOR_JOB_CHANGE
49
REQ_CANCEL_JOB = constants.LUXI_REQ_CANCEL_JOB
50
REQ_ARCHIVE_JOB = constants.LUXI_REQ_ARCHIVE_JOB
51
REQ_CHANGE_JOB_PRIORITY = constants.LUXI_REQ_CHANGE_JOB_PRIORITY
52
REQ_AUTO_ARCHIVE_JOBS = constants.LUXI_REQ_AUTO_ARCHIVE_JOBS
53
REQ_QUERY = constants.LUXI_REQ_QUERY
54
REQ_QUERY_FIELDS = constants.LUXI_REQ_QUERY_FIELDS
55
REQ_QUERY_JOBS = constants.LUXI_REQ_QUERY_JOBS
56
REQ_QUERY_INSTANCES = constants.LUXI_REQ_QUERY_INSTANCES
57
REQ_QUERY_NODES = constants.LUXI_REQ_QUERY_NODES
58
REQ_QUERY_GROUPS = constants.LUXI_REQ_QUERY_GROUPS
59
REQ_QUERY_NETWORKS = constants.LUXI_REQ_QUERY_NETWORKS
60
REQ_QUERY_EXPORTS = constants.LUXI_REQ_QUERY_EXPORTS
61
REQ_QUERY_CONFIG_VALUES = constants.LUXI_REQ_QUERY_CONFIG_VALUES
62
REQ_QUERY_CLUSTER_INFO = constants.LUXI_REQ_QUERY_CLUSTER_INFO
63
REQ_QUERY_TAGS = constants.LUXI_REQ_QUERY_TAGS
64
REQ_SET_DRAIN_FLAG = constants.LUXI_REQ_SET_DRAIN_FLAG
65
REQ_SET_WATCHER_PAUSE = constants.LUXI_REQ_SET_WATCHER_PAUSE
66
REQ_ALL = constants.LUXI_REQ_ALL
67

    
68
DEF_RWTO = constants.LUXI_DEF_RWTO
69
WFJC_TIMEOUT = constants.LUXI_WFJC_TIMEOUT
70

    
71

    
72
class Client(cl.AbstractClient):
73
  """High-level client implementation.
74

75
  This uses a backing Transport-like class on top of which it
76
  implements data serialization/deserialization.
77

78
  """
79
  def __init__(self, address=None, timeouts=None, transport=Transport):
80
    """Constructor for the Client class.
81

82
    Arguments are the same as for L{AbstractClient}.
83

84
    """
85
    super(Client, self).__init__(timeouts, transport)
86
    # Override the version of the protocol:
87
    self.version = constants.LUXI_VERSION
88
    # Store the socket address
89
    if address is None:
90
      address = pathutils.QUERY_SOCKET
91
    self.address = address
92
    self._InitTransport()
93

    
94
  def _GetAddress(self):
95
    return self.address
96

    
97
  def SetQueueDrainFlag(self, drain_flag):
98
    return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
99

    
100
  def SetWatcherPause(self, until):
101
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
102

    
103
  def PickupJob(self, job):
104
    return self.CallMethod(REQ_PICKUP_JOB, (job,))
105

    
106
  def SubmitJob(self, ops):
107
    ops_state = map(lambda op: op.__getstate__()
108
                               if not isinstance(op, objects.ConfigObject)
109
                               else op.ToDict(_with_private=True), ops)
110
    return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
111

    
112
  def SubmitJobToDrainedQueue(self, ops):
113
    ops_state = map(lambda op: op.__getstate__(), ops)
114
    return self.CallMethod(REQ_SUBMIT_JOB_TO_DRAINED_QUEUE, (ops_state, ))
115

    
116
  def SubmitManyJobs(self, jobs):
117
    jobs_state = []
118
    for ops in jobs:
119
      jobs_state.append([op.__getstate__() for op in ops])
120
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
121

    
122
  @staticmethod
123
  def _PrepareJobId(request_name, job_id):
124
    try:
125
      return int(job_id)
126
    except ValueError:
127
      raise RequestError("Invalid parameter passed to %s as job id: "
128
                         " expected integer, got value %s" %
129
                         (request_name, job_id))
130

    
131
  def CancelJob(self, job_id):
132
    job_id = Client._PrepareJobId(REQ_CANCEL_JOB, job_id)
133
    return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
134

    
135
  def ArchiveJob(self, job_id):
136
    job_id = Client._PrepareJobId(REQ_ARCHIVE_JOB, job_id)
137
    return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
138

    
139
  def ChangeJobPriority(self, job_id, priority):
140
    job_id = Client._PrepareJobId(REQ_CHANGE_JOB_PRIORITY, job_id)
141
    return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
142

    
143
  def AutoArchiveJobs(self, age):
144
    timeout = (DEF_RWTO - 1) / 2
145
    return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
146

    
147
  def WaitForJobChangeOnce(self, job_id, fields,
148
                           prev_job_info, prev_log_serial,
149
                           timeout=WFJC_TIMEOUT):
150
    """Waits for changes on a job.
151

152
    @param job_id: Job ID
153
    @type fields: list
154
    @param fields: List of field names to be observed
155
    @type prev_job_info: None or list
156
    @param prev_job_info: Previously received job information
157
    @type prev_log_serial: None or int/long
158
    @param prev_log_serial: Highest log serial number previously received
159
    @type timeout: int/float
160
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
161
                    be capped to that value)
162

163
    """
164
    assert timeout >= 0, "Timeout can not be negative"
165
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
166
                           (job_id, fields, prev_job_info,
167
                            prev_log_serial,
168
                            min(WFJC_TIMEOUT, timeout)))
169

    
170
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
171
    job_id = Client._PrepareJobId(REQ_WAIT_FOR_JOB_CHANGE, job_id)
172
    while True:
173
      result = self.WaitForJobChangeOnce(job_id, fields,
174
                                         prev_job_info, prev_log_serial)
175
      if result != constants.JOB_NOTCHANGED:
176
        break
177
    return result
178

    
179
  def Query(self, what, fields, qfilter):
180
    """Query for resources/items.
181

182
    @param what: One of L{constants.QR_VIA_LUXI}
183
    @type fields: List of strings
184
    @param fields: List of requested fields
185
    @type qfilter: None or list
186
    @param qfilter: Query filter
187
    @rtype: L{objects.QueryResponse}
188

189
    """
190
    result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
191
    return objects.QueryResponse.FromDict(result)
192

    
193
  def QueryFields(self, what, fields):
194
    """Query for available fields.
195

196
    @param what: One of L{constants.QR_VIA_LUXI}
197
    @type fields: None or list of strings
198
    @param fields: List of requested fields
199
    @rtype: L{objects.QueryFieldsResponse}
200

201
    """
202
    result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
203
    return objects.QueryFieldsResponse.FromDict(result)
204

    
205
  def QueryJobs(self, job_ids, fields):
206
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
207

    
208
  def QueryInstances(self, names, fields, use_locking):
209
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
210

    
211
  def QueryNodes(self, names, fields, use_locking):
212
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
213

    
214
  def QueryGroups(self, names, fields, use_locking):
215
    return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
216

    
217
  def QueryNetworks(self, names, fields, use_locking):
218
    return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
219

    
220
  def QueryExports(self, nodes, use_locking):
221
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
222

    
223
  def QueryClusterInfo(self):
224
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
225

    
226
  def QueryConfigValues(self, fields):
227
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
228

    
229
  def QueryTags(self, kind, name):
230
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))