]> gitweb.michael.orlitzky.com - djbdns-logparse.git/blob - djbdns/tinydns.py
52b06a4e338ea8acc2ce51da8ed90342b88ad80f
[djbdns-logparse.git] / djbdns / tinydns.py
1 from re import compile
2 from typing import Optional
3 from djbdns.common import *
4
5 # The "hex4" pattern matches a string of four hexadecimal digits. This
6 # is used, for example, by tinydns to encode the query type
7 # identifier.
8 hex4_pat = r'[0-9a-f]{4}'
9
10 # The IP pattern matches a string of either 8 or 32 hexadecimal
11 # characters, which correspond to IPv4 and IPv6 addresses,
12 # respectively, in tinydns logs.
13 ip_pat = r'[0-9a-f]{8,32}'
14
15 # The regex to match tinydns log lines.
16 tinydns_log_re = compile(
17 rf'({timestamp_pat}) ({ip_pat}):({hex4_pat}):({hex4_pat}) ([\+\-IC/]) ({hex4_pat}) (.*)'
18 )
19
20 # tinydns can drop a query for one of three reasons; this dictionary
21 # maps the symbol that gets logged in each case to a human-readable
22 # reason. We include the "+" case here, indicating that the query was
23 # NOT dropped, to avoid a special case later on when we're formatting
24 # the human-readable output.
25 query_drop_reason = {
26 "+": None,
27 "-": "no authority",
28 "I": "invalid query",
29 "C": "invalid class",
30 "/": "couldn't parse"
31 }
32
33
34 def handle_tinydns_log(line : str) -> Optional[str]:
35 r"""
36 Handle a single log line if it matches the ``tinydns_log_re`` regex.
37
38 Parameters
39 ----------
40
41 line : string
42 The log line that might match ``tinydns_log_re``.
43
44 Returns
45 -------
46
47 Either the human-readable string if the log line was handled (that
48 is, if it was really a tinydns log line), or ``None`` if it was
49 not.
50
51 Examples
52 --------
53
54 >>> line = "2022-09-14 21:04:40.206516500 7f000001:9d61:be69 - 0001 www.example.com"
55 >>> handle_tinydns_log(line)
56 '2022-09-14 21:04:40.206516500 dropped query (no authority) from 127.0.0.1:40289 (id 48745): a www.example.com'
57
58 >>> line = "this line is nonsense"
59 >>> handle_tinydns_log(line)
60
61 """
62 match = tinydns_log_re.match(line)
63 if not match:
64 return None
65
66 (timestamp, ip, port, id, code, type, name) = match.groups()
67 ip = convert_ip(ip)
68 port = int(port, 16)
69 id = int(id, 16)
70
71 # Convert the "type" field to a human-readable record type name
72 # using the query_type dictionary. If the right name isn't present
73 # in the dictionary, we use the (decimal) type id instead.
74 type = int(type, 16) # "001c" -> 28
75 type = query_type.get(type, type) # 28 -> "aaaa"
76
77 line_tpl = "{timestamp} "
78
79 reason = query_drop_reason[code]
80 if code == "+":
81 line_tpl += "sent response to {ip}:{port} (id {id}): {type} {name}"
82 else:
83 line_tpl += "dropped query ({reason}) from {ip}:{port}"
84 if code != "/":
85 # If the query can actually be parsed, the log line is a
86 # bit more informative than it would have been otherwise.
87 line_tpl += " (id {id}): {type} {name}"
88
89 return line_tpl.format(timestamp=timestamp,
90 reason=reason,
91 ip=ip,
92 port=port,
93 id=id,
94 type=type,
95 name=name)