Revision 479636a3

b/lib/cli.py
50 50
           "ListTags", "AddTags", "RemoveTags", "TAG_SRC_OPT",
51 51
           "FormatError", "SplitNodeOption", "SubmitOrSend",
52 52
           "JobSubmittedException", "FormatTimestamp", "ParseTimespec",
53
           "ValidateBeParams",
54
           "ToStderr", "ToStdout",
55
           "UsesRPC",
56
           "GetOnlineNodes",
53
           "ValidateBeParams", "ToStderr", "ToStdout", "UsesRPC",
54
           "GetOnlineNodes", "JobExecutor",
57 55
           ]
58 56

  
59 57

  
......
989 987

  
990 988
  """
991 989
  _ToStream(sys.stderr, txt, *args)
990

  
991

  
992
class JobExecutor(object):
993
  """Class which manages the submission and execution of multiple jobs.
994

  
995
  Note that instances of this class should not be reused between
996
  GetResults() calls.
997

  
998
  """
999
  def __init__(self, cl=None, verbose=True):
1000
    self.queue = []
1001
    if cl is None:
1002
      cl = GetClient()
1003
    self.cl = cl
1004
    self.verbose = verbose
1005

  
1006
  def QueueJob(self, name, *ops):
1007
    """Submit a job for execution.
1008

  
1009
    @type name: string
1010
    @param name: a description of the job, will be used in WaitJobSet
1011
    """
1012
    job_id = SendJob(ops, cl=self.cl)
1013
    self.queue.append((job_id, name))
1014

  
1015
  def GetResults(self):
1016
    """Wait for and return the results of all jobs.
1017

  
1018
    @rtype: list
1019
    @return: list of tuples (success, job results), in the same order
1020
        as the submitted jobs; if a job has failed, instead of the result
1021
        there will be the error message
1022

  
1023
    """
1024
    results = []
1025
    if self.verbose:
1026
      ToStdout("Submitted jobs %s", ", ".join(row[0] for row in self.queue))
1027
    for jid, name in self.queue:
1028
      if self.verbose:
1029
        ToStdout("Waiting for job %s for %s...", jid, name)
1030
      try:
1031
        job_result = PollJob(jid, cl=self.cl)
1032
        success = True
1033
      except (errors.GenericError, luxi.ProtocolError), err:
1034
        _, job_result = FormatError(err)
1035
        success = False
1036
        # the error message will always be shown, verbose or not
1037
        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
1038

  
1039
      results.append((success, job_result))
1040
    return results
1041

  
1042
  def WaitOrShow(self, wait):
1043
    """Wait for job results or only print the job IDs.
1044

  
1045
    @type wait: boolean
1046
    @param wait: whether to wait or not
1047

  
1048
    """
1049
    if wait:
1050
      return self.GetResults()
1051
    else:
1052
      for jid, name in self.queue:
1053
        ToStdout("%s: %s", jid, name)
b/scripts/gnt-instance
53 53
  ]
54 54

  
55 55

  
56
def _ExpandMultiNames(mode, names):
56
def _ExpandMultiNames(mode, names, client=None):
57 57
  """Expand the given names using the passed mode.
58 58

  
59 59
  For _SHUTDOWN_CLUSTER, all instances will be returned. For
......
76 76
  @raise errors.OpPrereqError: for invalid input parameters
77 77

  
78 78
  """
79
  if client is None:
80
    client = GetClient()
79 81
  if mode == _SHUTDOWN_CLUSTER:
80 82
    if names:
81 83
      raise errors.OpPrereqError("Cluster filter mode takes no arguments")
82
    client = GetClient()
83 84
    idata = client.QueryInstances([], ["name"])
84 85
    inames = [row[0] for row in idata]
85 86

  
......
88 89
                _SHUTDOWN_NODES_SEC):
89 90
    if not names:
90 91
      raise errors.OpPrereqError("No node names passed")
91
    client = GetClient()
92 92
    ndata = client.QueryNodes(names, ["name", "pinst_list", "sinst_list"])
93 93
    ipri = [row[1] for row in ndata]
94 94
    pri_names = list(itertools.chain(*ipri))
......
106 106
  elif mode == _SHUTDOWN_INSTANCES:
107 107
    if not names:
108 108
      raise errors.OpPrereqError("No instance names passed")
109
    client = GetClient()
110 109
    idata = client.QueryInstances(names, ["name"])
111 110
    inames = [row[0] for row in idata]
112 111

  
......
673 672
  @return: the desired exit code
674 673

  
675 674
  """
675
  cl = GetClient()
676 676
  if opts.multi_mode is None:
677 677
    opts.multi_mode = _SHUTDOWN_INSTANCES
678
  inames = _ExpandMultiNames(opts.multi_mode, args)
678
  inames = _ExpandMultiNames(opts.multi_mode, args, client=cl)
679 679
  if not inames:
680 680
    raise errors.OpPrereqError("Selection filter does not match any instances")
681 681
  multi_on = opts.multi_mode != _SHUTDOWN_INSTANCES or len(inames) > 1
682 682
  if not (opts.force_multi or not multi_on
683 683
          or _ConfirmOperation(inames, "startup")):
684 684
    return 1
685
  jex = cli.JobExecutor(verbose=multi_on, cl=cl)
685 686
  for name in inames:
686 687
    op = opcodes.OpStartupInstance(instance_name=name,
687 688
                                   force=opts.force,
688 689
                                   extra_args=opts.extra_args)
689
    if multi_on:
690
      ToStdout("Starting up %s", name)
691
    try:
692
      SubmitOrSend(op, opts)
693
    except JobSubmittedException, err:
694
      _, txt = FormatError(err)
695
      ToStdout("%s", txt)
690
    jex.QueueJob(name, op)
691
  jex.WaitOrShow(not opts.submit_only)
696 692
  return 0
697 693

  
698 694

  
......
711 707
  @return: the desired exit code
712 708

  
713 709
  """
710
  cl = GetClient()
714 711
  if opts.multi_mode is None:
715 712
    opts.multi_mode = _SHUTDOWN_INSTANCES
716
  inames = _ExpandMultiNames(opts.multi_mode, args)
713
  inames = _ExpandMultiNames(opts.multi_mode, args, client=cl)
717 714
  if not inames:
718 715
    raise errors.OpPrereqError("Selection filter does not match any instances")
719 716
  multi_on = opts.multi_mode != _SHUTDOWN_INSTANCES or len(inames) > 1
720 717
  if not (opts.force_multi or not multi_on
721 718
          or _ConfirmOperation(inames, "reboot")):
722 719
    return 1
720
  jex = JobExecutor(verbose=multi_on, cl=cl)
723 721
  for name in inames:
724 722
    op = opcodes.OpRebootInstance(instance_name=name,
725 723
                                  reboot_type=opts.reboot_type,
726 724
                                  ignore_secondaries=opts.ignore_secondaries)
727

  
728
    SubmitOrSend(op, opts)
725
    jex.QueueJob(name, op)
726
  jex.WaitOrShow(not opts.submit_only)
729 727
  return 0
730 728

  
731 729

  
......
741 739
  @return: the desired exit code
742 740

  
743 741
  """
742
  cl = GetClient()
744 743
  if opts.multi_mode is None:
745 744
    opts.multi_mode = _SHUTDOWN_INSTANCES
746
  inames = _ExpandMultiNames(opts.multi_mode, args)
745
  inames = _ExpandMultiNames(opts.multi_mode, args, client=cl)
747 746
  if not inames:
748 747
    raise errors.OpPrereqError("Selection filter does not match any instances")
749 748
  multi_on = opts.multi_mode != _SHUTDOWN_INSTANCES or len(inames) > 1
750 749
  if not (opts.force_multi or not multi_on
751 750
          or _ConfirmOperation(inames, "shutdown")):
752 751
    return 1
752

  
753
  jex = cli.JobExecutor(verbose=multi_on, cl=cl)
753 754
  for name in inames:
754 755
    op = opcodes.OpShutdownInstance(instance_name=name)
755
    if multi_on:
756
      ToStdout("Shutting down %s", name)
757
    try:
758
      SubmitOrSend(op, opts)
759
    except JobSubmittedException, err:
760
      _, txt = FormatError(err)
761
      ToStdout("%s", txt)
756
    jex.QueueJob(name, op)
757
  jex.WaitOrShow(not opts.submit_only)
762 758
  return 0
763 759

  
764 760

  
b/scripts/gnt-node
167 167
  @return: the desired exit code
168 168

  
169 169
  """
170
  cl = GetClient()
170 171
  force = opts.force
171 172
  selected_fields = ["name", "sinst_list"]
172 173
  src_node, dst_node = args
173 174

  
174 175
  op = opcodes.OpQueryNodes(output_fields=selected_fields, names=[src_node])
175
  result = SubmitOpCode(op)
176
  result = SubmitOpCode(op, cl=cl)
176 177
  src_node, sinst = result[0]
177 178
  op = opcodes.OpQueryNodes(output_fields=["name"], names=[dst_node])
178
  result = SubmitOpCode(op)
179
  result = SubmitOpCode(op, cl=cl)
179 180
  dst_node = result[0][0]
180 181

  
181 182
  if src_node == dst_node:
......
189 190

  
190 191
  sinst = utils.NiceSort(sinst)
191 192

  
192
  retcode = constants.EXIT_SUCCESS
193

  
194 193
  if not force and not AskUser("Relocate instance(s) %s from node\n"
195 194
                               " %s to node\n %s?" %
196 195
                               (",".join("'%s'" % name for name in sinst),
197 196
                               src_node, dst_node)):
198 197
    return constants.EXIT_CONFIRMATION
199 198

  
200
  good_cnt = bad_cnt = 0
199
  jex = JobExecutor()
201 200
  for iname in sinst:
202 201
    op = opcodes.OpReplaceDisks(instance_name=iname,
203 202
                                remote_node=dst_node,
204
                                mode=constants.REPLACE_DISK_ALL,
205
                                disks=["sda", "sdb"])
206
    try:
207
      ToStdout("Replacing disks for instance %s", iname)
208
      SubmitOpCode(op)
209
      ToStdout("Instance %s has been relocated", iname)
210
      good_cnt += 1
211
    except errors.GenericError, err:
212
      nret, msg = FormatError(err)
213
      retcode |= nret
214
      ToStderr("Error replacing disks for instance %s: %s", iname, msg)
215
      bad_cnt += 1
216

  
217
  if retcode == constants.EXIT_SUCCESS:
218
    ToStdout("All %d instance(s) relocated successfully.", good_cnt)
203
                                mode=constants.REPLACE_DISK_CHG,
204
                                disks=[])
205
    jex.QueueJob(iname, op)
206

  
207
  results = jex.GetResults()
208

  
209
  bad_cnt = len([row for row in results if not row[0]])
210
  if bad_cnt == 0:
211
    ToStdout("All %d instance(s) relocated successfully.", len(results))
212
    retcode = constants.EXIT_SUCCESS
219 213
  else:
220 214
    ToStdout("There were errors during the relocation:\n"
221
             "%d error(s) out of %d instance(s).", bad_cnt, good_cnt + bad_cnt)
215
             "%d error(s) out of %d instance(s).", bad_cnt, len(results))
216
    retcode = constants.EXIT_FAILURE
222 217
  return retcode
223 218

  
224 219

  
......
232 227
  @return: the desired exit code
233 228

  
234 229
  """
230
  cl = GetClient()
235 231
  force = opts.force
236 232
  selected_fields = ["name", "pinst_list"]
237 233

  
238 234
  op = opcodes.OpQueryNodes(output_fields=selected_fields, names=args)
239
  result = SubmitOpCode(op)
235
  result = SubmitOpCode(op, cl=cl)
240 236
  node, pinst = result[0]
241 237

  
242 238
  if not pinst:
......
251 247
                               (",".join("'%s'" % name for name in pinst))):
252 248
    return 2
253 249

  
254
  good_cnt = bad_cnt = 0
250
  jex = JobExecutor(cl=cl)
255 251
  for iname in pinst:
256 252
    op = opcodes.OpFailoverInstance(instance_name=iname,
257 253
                                    ignore_consistency=opts.ignore_consistency)
258
    try:
259
      ToStdout("Failing over instance %s", iname)
260
      SubmitOpCode(op)
261
      ToStdout("Instance %s has been failed over", iname)
262
      good_cnt += 1
263
    except errors.GenericError, err:
264
      nret, msg = FormatError(err)
265
      retcode |= nret
266
      ToStderr("Error failing over instance %s: %s", iname, msg)
267
      bad_cnt += 1
268

  
269
  if retcode == 0:
270
    ToStdout("All %d instance(s) failed over successfully.", good_cnt)
254
    jex.QueueJob(iname, op)
255
  results = jex.GetResults()
256
  bad_cnt = len([row for row in results if not row[0]])
257
  if bad_cnt == 0:
258
    ToStdout("All %d instance(s) failed over successfully.", len(results))
271 259
  else:
272 260
    ToStdout("There were errors during the failover:\n"
273
             "%d error(s) out of %d instance(s).", bad_cnt, good_cnt + bad_cnt)
261
             "%d error(s) out of %d instance(s).", bad_cnt, len(results))
274 262
  return retcode
275 263

  
276 264

  

Also available in: Unified diff