Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ a5efec93

History | View | Annotate | Download (7.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011, 2012, 2013, 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the LUXI protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
from ganeti import constants
33
from ganeti import objects
34
import ganeti.rpc.client as cl
35
from ganeti.rpc.errors import RequestError
36
from ganeti.rpc.transport import Transport
37

    
38
__all__ = [
39
  # classes:
40
  "Client"
41
  ]
42

    
43
REQ_SUBMIT_JOB = constants.LUXI_REQ_SUBMIT_JOB
44
REQ_SUBMIT_JOB_TO_DRAINED_QUEUE = constants.LUXI_REQ_SUBMIT_JOB_TO_DRAINED_QUEUE
45
REQ_SUBMIT_MANY_JOBS = constants.LUXI_REQ_SUBMIT_MANY_JOBS
46
REQ_PICKUP_JOB = constants.LUXI_REQ_PICKUP_JOB
47
REQ_WAIT_FOR_JOB_CHANGE = constants.LUXI_REQ_WAIT_FOR_JOB_CHANGE
48
REQ_CANCEL_JOB = constants.LUXI_REQ_CANCEL_JOB
49
REQ_ARCHIVE_JOB = constants.LUXI_REQ_ARCHIVE_JOB
50
REQ_CHANGE_JOB_PRIORITY = constants.LUXI_REQ_CHANGE_JOB_PRIORITY
51
REQ_AUTO_ARCHIVE_JOBS = constants.LUXI_REQ_AUTO_ARCHIVE_JOBS
52
REQ_QUERY = constants.LUXI_REQ_QUERY
53
REQ_QUERY_FIELDS = constants.LUXI_REQ_QUERY_FIELDS
54
REQ_QUERY_JOBS = constants.LUXI_REQ_QUERY_JOBS
55
REQ_QUERY_INSTANCES = constants.LUXI_REQ_QUERY_INSTANCES
56
REQ_QUERY_NODES = constants.LUXI_REQ_QUERY_NODES
57
REQ_QUERY_GROUPS = constants.LUXI_REQ_QUERY_GROUPS
58
REQ_QUERY_NETWORKS = constants.LUXI_REQ_QUERY_NETWORKS
59
REQ_QUERY_EXPORTS = constants.LUXI_REQ_QUERY_EXPORTS
60
REQ_QUERY_CONFIG_VALUES = constants.LUXI_REQ_QUERY_CONFIG_VALUES
61
REQ_QUERY_CLUSTER_INFO = constants.LUXI_REQ_QUERY_CLUSTER_INFO
62
REQ_QUERY_TAGS = constants.LUXI_REQ_QUERY_TAGS
63
REQ_SET_DRAIN_FLAG = constants.LUXI_REQ_SET_DRAIN_FLAG
64
REQ_SET_WATCHER_PAUSE = constants.LUXI_REQ_SET_WATCHER_PAUSE
65
REQ_ALL = constants.LUXI_REQ_ALL
66

    
67
DEF_RWTO = constants.LUXI_DEF_RWTO
68
WFJC_TIMEOUT = constants.LUXI_WFJC_TIMEOUT
69

    
70

    
71
class Client(cl.AbstractClient):
72
  """High-level client implementation.
73

74
  This uses a backing Transport-like class on top of which it
75
  implements data serialization/deserialization.
76

77
  """
78
  def __init__(self, address=None, timeouts=None, transport=Transport):
79
    """Constructor for the Client class.
80

81
    Arguments are the same as for L{AbstractClient}.
82

83
    """
84
    super(Client, self).__init__(address, timeouts, transport)
85
    # Override the version of the protocol:
86
    self.version = constants.LUXI_VERSION
87

    
88
  def SetQueueDrainFlag(self, drain_flag):
89
    return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
90

    
91
  def SetWatcherPause(self, until):
92
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
93

    
94
  def PickupJob(self, job):
95
    return self.CallMethod(REQ_PICKUP_JOB, (job,))
96

    
97
  def SubmitJob(self, ops):
98
    ops_state = map(lambda op: op.__getstate__()
99
                               if not isinstance(op, objects.ConfigObject)
100
                               else op.ToDict(_with_private=True), ops)
101
    return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
102

    
103
  def SubmitJobToDrainedQueue(self, ops):
104
    ops_state = map(lambda op: op.__getstate__(), ops)
105
    return self.CallMethod(REQ_SUBMIT_JOB_TO_DRAINED_QUEUE, (ops_state, ))
106

    
107
  def SubmitManyJobs(self, jobs):
108
    jobs_state = []
109
    for ops in jobs:
110
      jobs_state.append([op.__getstate__() for op in ops])
111
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
112

    
113
  @staticmethod
114
  def _PrepareJobId(request_name, job_id):
115
    try:
116
      return int(job_id)
117
    except ValueError:
118
      raise RequestError("Invalid parameter passed to %s as job id: "
119
                         " expected integer, got value %s" %
120
                         (request_name, job_id))
121

    
122
  def CancelJob(self, job_id):
123
    job_id = Client._PrepareJobId(REQ_CANCEL_JOB, job_id)
124
    return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
125

    
126
  def ArchiveJob(self, job_id):
127
    job_id = Client._PrepareJobId(REQ_ARCHIVE_JOB, job_id)
128
    return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
129

    
130
  def ChangeJobPriority(self, job_id, priority):
131
    job_id = Client._PrepareJobId(REQ_CHANGE_JOB_PRIORITY, job_id)
132
    return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
133

    
134
  def AutoArchiveJobs(self, age):
135
    timeout = (DEF_RWTO - 1) / 2
136
    return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
137

    
138
  def WaitForJobChangeOnce(self, job_id, fields,
139
                           prev_job_info, prev_log_serial,
140
                           timeout=WFJC_TIMEOUT):
141
    """Waits for changes on a job.
142

143
    @param job_id: Job ID
144
    @type fields: list
145
    @param fields: List of field names to be observed
146
    @type prev_job_info: None or list
147
    @param prev_job_info: Previously received job information
148
    @type prev_log_serial: None or int/long
149
    @param prev_log_serial: Highest log serial number previously received
150
    @type timeout: int/float
151
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
152
                    be capped to that value)
153

154
    """
155
    assert timeout >= 0, "Timeout can not be negative"
156
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
157
                           (job_id, fields, prev_job_info,
158
                            prev_log_serial,
159
                            min(WFJC_TIMEOUT, timeout)))
160

    
161
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
162
    job_id = Client._PrepareJobId(REQ_WAIT_FOR_JOB_CHANGE, job_id)
163
    while True:
164
      result = self.WaitForJobChangeOnce(job_id, fields,
165
                                         prev_job_info, prev_log_serial)
166
      if result != constants.JOB_NOTCHANGED:
167
        break
168
    return result
169

    
170
  def Query(self, what, fields, qfilter):
171
    """Query for resources/items.
172

173
    @param what: One of L{constants.QR_VIA_LUXI}
174
    @type fields: List of strings
175
    @param fields: List of requested fields
176
    @type qfilter: None or list
177
    @param qfilter: Query filter
178
    @rtype: L{objects.QueryResponse}
179

180
    """
181
    result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
182
    return objects.QueryResponse.FromDict(result)
183

    
184
  def QueryFields(self, what, fields):
185
    """Query for available fields.
186

187
    @param what: One of L{constants.QR_VIA_LUXI}
188
    @type fields: None or list of strings
189
    @param fields: List of requested fields
190
    @rtype: L{objects.QueryFieldsResponse}
191

192
    """
193
    result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
194
    return objects.QueryFieldsResponse.FromDict(result)
195

    
196
  def QueryJobs(self, job_ids, fields):
197
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
198

    
199
  def QueryInstances(self, names, fields, use_locking):
200
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
201

    
202
  def QueryNodes(self, names, fields, use_locking):
203
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
204

    
205
  def QueryGroups(self, names, fields, use_locking):
206
    return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
207

    
208
  def QueryNetworks(self, names, fields, use_locking):
209
    return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
210

    
211
  def QueryExports(self, nodes, use_locking):
212
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
213

    
214
  def QueryClusterInfo(self):
215
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
216

    
217
  def QueryConfigValues(self, fields):
218
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
219

    
220
  def QueryTags(self, kind, name):
221
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))