r""" Functions and data specific to tinydns logs. """ # Don't clobber the global compile() with a named import. import re from typing import Optional from djbdns.common import QUERY_TYPE_NAME, TIMESTAMP_PAT, convert_ip # The "hex4" pattern matches a string of four hexadecimal digits. This # is used, for example, by tinydns to encode the query type # identifier. HEX4_PAT = r'[0-9a-f]{4}' # The IP pattern matches a string of either 8 or 32 hexadecimal # characters, which correspond to IPv4 and IPv6 addresses, # respectively, in tinydns logs. IP_PAT = r'[0-9a-f]{8,32}' # The regex to match tinydns log lines. TINYDNS_LOG_RE = re.compile( rf'({TIMESTAMP_PAT}) ({IP_PAT}):({HEX4_PAT}):({HEX4_PAT}) ([\+\-IC/]) ({HEX4_PAT}) (.*)' ) # tinydns can drop a query for one of three reasons; this dictionary # maps the symbol that gets logged in each case to a human-readable # reason. We include the "+" case here, indicating that the query was # NOT dropped, to avoid a special case later on when we're formatting # the human-readable output. QUERY_DROP_REASON = { "+": None, "-": "no authority", "I": "invalid query", "C": "invalid class", "/": "couldn't parse" } def handle_tinydns_log(line : str) -> Optional[str]: r""" Handle a single log line if it matches the ``TINYDNS_LOG_RE`` regex. Parameters ---------- line : string The log line that might match ``TINYDNS_LOG_RE``. Returns ------- Either the human-readable string if the log line was handled (that is, if it was really a tinydns log line), or ``None`` if it was not. Examples -------- >>> line = "2022-09-14 21:04:40.206516500 7f000001:9d61:be69 - 0001 www.example.com" >>> handle_tinydns_log(line) '2022-09-14 21:04:40.206516500 dropped query (no authority) from 127.0.0.1:40289 (id 48745): a www.example.com' >>> line = "this line is nonsense" >>> handle_tinydns_log(line) """ match = TINYDNS_LOG_RE.match(line) if not match: return None (timestamp, ip, port, request_id, code, query_type, name) = match.groups() ip = convert_ip(ip) port = int(port, 16) request_id = int(request_id, 16) # Convert the "type" field to a human-readable record type name # using the query_type dictionary. query_type = int(query_type, 16) # "001c" -> 28 query_type = QUERY_TYPE_NAME.get(query_type) # 28 -> "aaaa" line_tpl = "{timestamp} " reason = QUERY_DROP_REASON[code] if code == "+": line_tpl += "sent response to {ip}:{port} (id {request_id}): " line_tpl += "{query_type} {name}" else: line_tpl += "dropped query ({reason}) from {ip}:{port}" if code != "/": # If the query can actually be parsed, the log line is a # bit more informative than it would have been otherwise. line_tpl += " (id {request_id}): {query_type} {name}" return line_tpl.format(timestamp=timestamp, reason=reason, ip=ip, port=port, request_id=request_id, query_type=query_type, name=name)