r""" Functions and data specific to dnscache logs. """ # Don't clobber the global compile() with a named import. import re from typing import Optional from djbdns.common import QUERY_TYPE_NAME, TIMESTAMP_PAT, convert_ip # The regex to match dnscache log lines. DNSCACHE_LOG_RE = re.compile(fr'({TIMESTAMP_PAT}) (\w+)(.*)') def decode_client(words : list, i : int): r""" Helper function to decode the client field in a dnscache log entry. There are two possible formats for the client field, 1. clientip:clientport, used by tcpopen/tcpclose entries, 2. clientip:clientport:id, used by "query" entries. We convert each part from hex to decimal, and in the second format, separate the packet id from the client information. Parameters ---------- words : list The ``words`` list (a list of fields) from :func:`handle_dnscache_log`. i : int The index of the client field within ``words`` Returns ------- Nothing; the ``i``th entry in the ``words`` list is modified in-place. Examples -------- >>> words = ["foo", "bar", "7f000001:9253", "quux"] >>> decode_client(words, 2) >>> words ['foo', 'bar', '127.0.0.1:37459', 'quux'] >>> words = ["foo", "7f000001:a3db:4fb9", "bar", "quux"] >>> decode_client(words, 1) >>> words ['foo', '127.0.0.1:41947 (id 20409)', 'bar', 'quux'] """ chunks = words[i].split(":") ip = convert_ip(chunks[0]) port = int(chunks[1], 16) words[i] = f"{ip}:{port}" if len(chunks) == 3: # For a "query" entry's clientip:clientport:id field. packet_id = int(chunks[2], 16) words[i] += f" (id {packet_id})" def decode_ip(words : list, i : int): r""" Helper function to decode the ip field in a dnscache log entry. A single "serverip" field is present in the lame, nodata, nxdomain, and rr entry types. We convert it from hex to decimal. Parameters ---------- words : list The ``words`` list (a list of fields) from :func:`handle_dnscache_log`. i : int The index of the ip field within ``words`` Returns ------- Nothing; the ``i``th entry in the ``words`` list is modified in-place. Examples -------- >>> words = ["foo", "bar", "7f000001", "quux"] >>> decode_ip(words, 2) >>> words ['foo', 'bar', '127.0.0.1', 'quux'] >>> words = ["foo", "00000000000000000000ffff7f000001", "bar", "quux"] >>> decode_ip(words, 1) >>> words ['foo', '0000:0000:0000:0000:0000:ffff:7f00:0001', 'bar', 'quux'] """ words[i] = convert_ip(words[i]) def decode_ttl(words : list, i : int): r""" Helper function to decode the ttl field in a dnscache log entry. A single "ttl" field is present in the nodata, nxdomain, and rr entry types. We prefix it with "TTL=" so that its meaning is clear in the human-readable logs. Parameters ---------- words : list The ``words`` list (a list of fields) from :func:`handle_dnscache_log`. i : int The index of the ttl field within ``words`` Returns ------- Nothing; the ``i``th entry in the ``words`` list is modified in-place. Examples -------- >>> words = ["c0a80101", "20865", "1", "www.example.com.", "5db8d822"] >>> decode_ttl(words, 1) >>> words ['c0a80101', 'TTL=20865', '1', 'www.example.com.', '5db8d822'] """ words[i] = f"TTL={words[i]}" def decode_serial(words : list, i : int): r""" Helper function to decode the serial field in a dnscache log entry. A single "serial" field is present in the drop and query entry types. It's already in decimal; we simply prefix it with a hash. Parameters ---------- words : list The ``words`` list (a list of fields) from :func:`handle_dnscache_log`. i : int The index of the serial field within ``words`` Returns ------- Nothing; the ``i``th entry in the ``words`` list is modified in-place. Examples -------- >>> words = ["1", "7f000001:a3db:4fb9", "1", "www.example.com."] >>> decode_serial(words, 0) >>> words ['#1', '7f000001:a3db:4fb9', '1', 'www.example.com.'] """ words[i] = f"#{words[i]}" def decode_type(words : list, i : int): r""" Helper function to decode the type field in a dnscache log entry. A single "type" field is present in cached, nodata, query, rr, and tx entries. Unlike with tinydns entries, dnscache logs have this field already in decimal, so we just look up the corresponding name in the query type map. Parameters ---------- words : list A list with the "type" string at index ``i`` i : int The index of the type field within ``words`` Returns ------- Nothing; the ``i``th entry in the ``words`` list is modified in-place. Examples -------- >>> words = ["2", "7f000001:b848:0f0b", "16", "example.com."] >>> decode_type(words, 2) >>> words ['2', '7f000001:b848:0f0b', 'txt', 'example.com.'] """ qt = words[i] words[i] = QUERY_TYPE_NAME[int(qt)] def handle_dnscache_log(line : str) -> Optional[str]: r""" Handle a single log line if it matches the ``DNSCACHE_LOG_RE`` regex. Parameters ---------- line : string The log line that might match ``DNSCACHE_LOG_RE``. Returns ------- Either the human-readable string if the log line was handled (that is, if it was really a dnscache log line), or ``None`` if it was not. Examples -------- >>> line = "2022-09-15 18:37:33.863805500 query 1 7f000001:a3db:4fb9 1 www.example.com." >>> handle_dnscache_log(line) '2022-09-15 18:37:33.863805500 query #1 127.0.0.1:41947 (id 20409) a www.example.com.' >>> line = "2022-09-15 18:37:33.863874500 tx 0 1 www.example.com. . c0a80101" >>> handle_dnscache_log(line) '2022-09-15 18:37:33.863874500 tx g=0 a www.example.com. . 192.168.1.1' >>> line = "2022-09-15 18:37:33.878529500 rr c0a80101 20865 1 www.example.com. 5db8d822" >>> handle_dnscache_log(line) '2022-09-15 18:37:33.878529500 rr 192.168.1.1 TTL=20865 a www.example.com. 93.184.216.34' >>> line = "2022-09-15 18:37:33.878532500 stats 1 43 1 0" >>> handle_dnscache_log(line) '2022-09-15 18:37:33.878532500 stats count=1 motion=43 udp-active=1 tcp-active=0' >>> line = "2022-09-15 18:37:33.878602500 sent 1 49" >>> handle_dnscache_log(line) '2022-09-15 18:37:33.878602500 sent #1 49' >>> line = "this line is nonsense" >>> handle_dnscache_log(line) """ match = DNSCACHE_LOG_RE.match(line) if not match: return None (timestamp, event, data) = match.groups() words = data.split() if event == "cached": if words[0] not in ("cname", "ns", "nxdomain"): decode_type(words, 0) elif event == "drop": decode_serial(words, 0) elif event == "lame": decode_ip(words, 0) elif event == "nodata": decode_ip(words, 0) decode_ttl(words, 1) decode_type(words, 2) elif event == "nxdomain": decode_ip(words, 0) decode_ttl(words, 1) elif event == "query": decode_serial(words, 0) decode_client(words, 1) decode_type(words, 2) elif event == "rr": decode_ip(words, 0) decode_ttl(words, 1) if words[2] not in ("cname", "mx", "ns", "ptr", "soa"): decode_type(words, 2) if words[2] == "a": # Decode the response to an 'A' query decode_ip(words, 4) if words[2] == "txt": # Decode the TXT record's data from hex to ASCII. response = words[4] if response.endswith("..."): ellipsis = "..." response = response[0:-3] else: ellipsis = "" length = int(response[0:2], 16) chars = [] for i in range(1, len(response)//2): chars.append(chr(int(response[2*i : (2*i)+2], 16))) txt = "".join(chars) words[4] = f"{length}:\"{txt}{ellipsis}\"" elif event == "sent": decode_serial(words, 0) elif event == "stats": words[0] = f"count={words[0]}" words[1] = f"motion={words[1]}" words[2] = f"udp-active={words[2]}" words[3] = f"tcp-active={words[3]}" elif event == "tx": words[0] = f"g={words[0]}" decode_type(words, 1) # words[2] = name # words[3] = control (domain for which these servers are believed # to be authoritative) for i in range(4, len(words)): decode_ip(words, i) elif event in ("tcpopen", "tcpclose"): decode_client(words, 0) # Reconstitute "data" (i.e. everything after the timestamp and the # event) from "words", which was originally obtained by splitting # "data". data = " ".join(words) return f"{timestamp} {event} {data}"