diff --git a/idstools/rule.py b/idstools/rule.py index 17a5231..0134497 100644 --- a/idstools/rule.py +++ b/idstools/rule.py @@ -73,6 +73,8 @@ class Rule(dict): - **rev**: The revision of the rule as an integer - **msg**: The rule message as a string - **flowbits**: List of flowbit options in the rule + - **xbits**: List of xbit options in the rule + - **flowints**: List of flowint options in the rule - **metadata**: Metadata values as a list - **references**: References as a list - **classtype**: The classification type @@ -96,6 +98,8 @@ def __init__(self, enabled=None, action=None, group=None): self["rev"] = None self["msg"] = None self["flowbits"] = [] + self["xbits"] = [] + self["flowints"] = [] self["metadata"] = [] self["references"] = [] self["classtype"] = None @@ -302,6 +306,10 @@ def parse(buf, group=None): rule[name] += [v.strip() for v in val.split(",")] elif name == "flowbits": rule.flowbits.append(val) + elif name == "xbits": + rule.xbits.append(val) + elif name == "flowint": + rule.flowints.append(val) elif name == "reference": rule.references.append(val) elif name == "msg": diff --git a/tests/test_rule.py b/tests/test_rule.py index eb17211..50dbe71 100644 --- a/tests/test_rule.py +++ b/tests/test_rule.py @@ -50,6 +50,8 @@ def test_parse1(self): self.assertEqual(len(rule.flowbits), 2) self.assertEquals(rule.flowbits[0], "isset,somebit") self.assertEquals(rule.flowbits[1], "unset,otherbit") + self.assertEqual(len(rule.xbits), 0) + self.assertEqual(len(rule.flowints), 0) self.assertEquals(rule.classtype, "trojan-activity") def test_disable_rule(self): @@ -162,7 +164,7 @@ def test_scratch(self): print("%s" % reassembled) self.assertEquals(rule_string, reassembled) - + def test_parse_message_with_semicolon(self): rule_string = u"""alert ip any any -> any any (msg:"TEST RULE\; and some"; content:"uid=0|28|root|29|"; tag:session,5,packets; classtype:bad-unknown; sid:10000000; rev:1;)""" rule = idstools.rule.parse(rule_string) @@ -202,3 +204,34 @@ def test_parse(self): rs = u"""alert any [1.1.1.1, !2.2.2.2] any -> any [1,2,3] (msg:"TEST"; sid:1; rev:1;)""" rule = idstools.rule.parse(rs) self.assertIsNotNone(rule) + + def test_parse_flowint_rule(self): + rs = u"""alert tcp any any -> any any (msg:"Counting Usernames"; content:"jonkman"; flowint: usernamecount, +, 1; noalert;)""" + rule = idstools.rule.parse(rs) + self.assertEqual(len(rule.flowints), 1) + self.assertEqual(rule.flowints[0], "usernamecount, +, 1") + + rs = """alert tcp any any -> any any (msg:"More than Five Usernames!"; content:"jonkman"; flowint: usernamecount, +, 1; flowint:usernamecount, >, 5;)""" + rule = idstools.rule.parse(rs) + self.assertEqual(len(rule.flowints), 2) + self.assertEqual(rule.flowints[0], "usernamecount, +, 1") + self.assertEqual(rule.flowints[1], "usernamecount, >, 5") + + rs = """alert tcp any any -> any any (msg:"Setting a flowint counter"; content:"GET"; flowint:myvar, notset; flowint:maxvar,notset; flowint:myvar,=,1; flowint: maxvar,=,6;)""" + rule = idstools.rule.parse(rs) + self.assertEqual(len(rule.flowints), 4) + self.assertEqual(rule.flowints[0], "myvar, notset") + self.assertEqual(rule.flowints[1], "maxvar,notset") + self.assertEqual(rule.flowints[2], "myvar,=,1") + self.assertEqual(rule.flowints[3], "maxvar,=,6") + + def test_parse_xbits_rule(self): + rs = u"""drop ssh any any -> $MYSERVER 22 (msg:"DROP libssh incoming"; flow:to_server,established; ssh.softwareversion:"libssh"; xbits:set, badssh, track ip_src, expire 3600; sid:4000000005;)""" + rule = idstools.rule.parse(rs) + self.assertEqual(len(rule.xbits), 1) + self.assertEqual(rule.xbits[0], "set, badssh, track ip_src, expire 3600") + + rs = """drop ssh any any -> $MYSERVER 22 (msg:"DROP BLACKLISTED"; xbits:isset, badssh, track ip_src; sid:4000000006;)""" + rule = idstools.rule.parse(rs) + self.assertEqual(len(rule.xbits), 1) + self.assertEqual(rule.xbits[0], "isset, badssh, track ip_src")