import re
import string # pylint: disable-msg=W0402
+import logging
import pyparsing as pyp
from ganeti import errors
from ganeti import netutils
from ganeti import utils
+from ganeti import compat
# Logic operators with one or more operands, each of which is a filter on its
#: Characters used for detecting user-written filters (see L{MaybeFilter})
-FILTER_DETECTION_CHARS = frozenset("()=/!~" + string.whitespace)
+FILTER_DETECTION_CHARS = frozenset("()=/!~'\"\\" + string.whitespace)
+
+#: Characters used to detect globbing filters (see L{MaybeGlobbing})
+GLOB_DETECTION_CHARS = frozenset("*?")
def MakeSimpleFilter(namefield, values):
@rtype: list
"""
+ logging.debug("Parsing as query filter: %s", text)
+
if parser is None:
parser = BuildFilterParser()
" '%s': %s" % (text, err), err)
-def MaybeFilter(text):
- """Try to determine if a string is a filter or a name.
+def _IsHostname(text):
+ """Checks if a string could be a hostname.
- If in doubt, this function treats a text as a name.
-
- @type text: string
- @param text: String to be examined
@rtype: bool
"""
- # Quick check for punctuation and whitespace
- if frozenset(text) & FILTER_DETECTION_CHARS:
- return True
-
try:
netutils.Hostname.GetNormalizedName(text)
except errors.OpPrereqError:
- # Not a valid hostname, treat as filter
+ return False
+ else:
return True
- # Most probably a name
- return False
+
+def _CheckFilter(text):
+ """CHecks if a string could be a filter.
+
+ @rtype: bool
+
+ """
+ return bool(frozenset(text) & FILTER_DETECTION_CHARS)
+
+
+def _CheckGlobbing(text):
+ """Checks if a string could be a globbing pattern.
+
+ @rtype: bool
+
+ """
+ return bool(frozenset(text) & GLOB_DETECTION_CHARS)
+
+
+def _MakeFilterPart(namefield, text):
+ """Generates filter for one argument.
+
+ """
+ if _CheckGlobbing(text):
+ return [OP_REGEXP, namefield, utils.DnsNameGlobPattern(text)]
+ else:
+ return [OP_EQUAL, namefield, text]
+
+
+def MakeFilter(args, force_filter):
+ """Try to make a filter from arguments to a command.
+
+ If the name could be a filter it is parsed as such. If it's just a globbing
+ pattern, e.g. "*.site", such a filter is constructed. As a last resort the
+ names are treated just as a plain name filter.
+
+ @type args: list of string
+ @param args: Arguments to command
+ @type force_filter: bool
+ @param force_filter: Whether to force treatment as a full-fledged filter
+ @rtype: list
+ @return: Query filter
+
+ """
+ if (force_filter or
+ (args and len(args) == 1 and _CheckFilter(args[0]))):
+ try:
+ (filter_text, ) = args
+ except (TypeError, ValueError):
+ raise errors.OpPrereqError("Exactly one argument must be given as a"
+ " filter")
+
+ result = ParseFilter(filter_text)
+ elif args:
+ result = [OP_OR] + map(compat.partial(_MakeFilterPart, "name"), args)
+ else:
+ result = None
+
+ return result
self.parser = qlang.BuildFilterParser()
def _Test(self, filter_, expected, expect_filter=True):
- if expect_filter:
- self.assertTrue(qlang.MaybeFilter(filter_),
- msg="'%s' was not recognized as a filter" % filter_)
- else:
- self.assertFalse(qlang.MaybeFilter(filter_),
- msg=("'%s' should not be recognized as a filter" %
- filter_))
+ self.assertEqual(qlang.MakeFilter([filter_], not expect_filter), expected)
self.assertEqual(qlang.ParseFilter(filter_, parser=self.parser), expected)
def test(self):
self.fail("Invalid filter '%s' did not raise exception" % filter_)
-class TestMaybeFilter(unittest.TestCase):
- def test(self):
- self.assertTrue(qlang.MaybeFilter(""))
- self.assertTrue(qlang.MaybeFilter("foo/bar"))
- self.assertTrue(qlang.MaybeFilter("foo==bar"))
-
- for i in set("()!~" + string.whitespace) | qlang.FILTER_DETECTION_CHARS:
- self.assertTrue(qlang.MaybeFilter(i),
- msg="%r not recognized as filter" % i)
-
- self.assertFalse(qlang.MaybeFilter("node1"))
- self.assertFalse(qlang.MaybeFilter("n-o-d-e"))
- self.assertFalse(qlang.MaybeFilter("n_o_d_e"))
- self.assertFalse(qlang.MaybeFilter("node1.example.com"))
- self.assertFalse(qlang.MaybeFilter("node1.example.com."))
+class TestMakeFilter(unittest.TestCase):
+ def testNoNames(self):
+ self.assertEqual(qlang.MakeFilter([], False), None)
+ self.assertEqual(qlang.MakeFilter(None, False), None)
+
+ def testPlainNames(self):
+ self.assertEqual(qlang.MakeFilter(["web1", "web2"], False),
+ [qlang.OP_OR, [qlang.OP_EQUAL, "name", "web1"],
+ [qlang.OP_EQUAL, "name", "web2"]])
+
+ def testForcedFilter(self):
+ for i in [None, [], ["1", "2"], ["", "", ""], ["a", "b", "c", "d"]]:
+ self.assertRaises(errors.OpPrereqError, qlang.MakeFilter, i, True)
+
+ # Glob pattern shouldn't parse as filter
+ self.assertRaises(errors.QueryFilterParseError,
+ qlang.MakeFilter, ["*.site"], True)
+
+ # Plain name parses as boolean filter
+ self.assertEqual(qlang.MakeFilter(["web1"], True), [qlang.OP_TRUE, "web1"])
+
+ def testFilter(self):
+ self.assertEqual(qlang.MakeFilter(["foo/bar"], False),
+ [qlang.OP_TRUE, "foo/bar"])
+ self.assertEqual(qlang.MakeFilter(["foo=='bar'"], False),
+ [qlang.OP_EQUAL, "foo", "bar"])
+ self.assertEqual(qlang.MakeFilter(["field=*'*.site'"], False),
+ [qlang.OP_REGEXP, "field",
+ utils.DnsNameGlobPattern("*.site")])
+
+ # Plain name parses as name filter, not boolean
+ for name in ["node1", "n-o-d-e", "n_o_d_e", "node1.example.com",
+ "node1.example.com."]:
+ self.assertEqual(qlang.MakeFilter([name], False),
+ [qlang.OP_OR, [qlang.OP_EQUAL, "name", name]])
+
+ # Invalid filters
+ for i in ["foo==bar", "foo+=1"]:
+ self.assertRaises(errors.QueryFilterParseError,
+ qlang.MakeFilter, [i], False)
+
+ def testGlob(self):
+ self.assertEqual(qlang.MakeFilter(["*.site"], False),
+ [qlang.OP_OR, [qlang.OP_REGEXP, "name",
+ utils.DnsNameGlobPattern("*.site")]])
+ self.assertEqual(qlang.MakeFilter(["web?.example"], False),
+ [qlang.OP_OR, [qlang.OP_REGEXP, "name",
+ utils.DnsNameGlobPattern("web?.example")]])
+ self.assertEqual(qlang.MakeFilter(["*.a", "*.b", "?.c"], False),
+ [qlang.OP_OR,
+ [qlang.OP_REGEXP, "name",
+ utils.DnsNameGlobPattern("*.a")],
+ [qlang.OP_REGEXP, "name",
+ utils.DnsNameGlobPattern("*.b")],
+ [qlang.OP_REGEXP, "name",
+ utils.DnsNameGlobPattern("?.c")]])
if __name__ == "__main__":