X-Git-Url: https://gitweb.michael.orlitzky.com/?a=blobdiff_plain;f=djbdns%2Fdnscache.py;h=450dbd6c5b73151265ebafe84cbb8c129498f03c;hb=f2dc3c2bcec782de9931ec881b6fd64d8a10956e;hp=785121b65d0541d04005654ebf0c59a7a7144dd7;hpb=0942cb75e6d2e73f81f48166053bf3bb1997cb91;p=djbdns-logparse.git diff --git a/djbdns/dnscache.py b/djbdns/dnscache.py index 785121b..450dbd6 100644 --- a/djbdns/dnscache.py +++ b/djbdns/dnscache.py @@ -1,9 +1,14 @@ -from re import compile +r""" +Functions and data specific to dnscache logs. +""" +# Don't clobber the global compile() with a named import. +import re + from typing import Optional -from djbdns.common import * +from djbdns.common import QUERY_TYPE_NAME, TIMESTAMP_PAT, convert_ip # The regex to match dnscache log lines. -dnscache_log_re = compile(fr'({timestamp_pat}) (\w+)(.*)') +DNSCACHE_LOG_RE = re.compile(fr'({TIMESTAMP_PAT}) (\w+)(.*)') def decode_client(words : list, i : int): @@ -57,8 +62,8 @@ def decode_client(words : list, i : int): if len(chunks) == 3: # For a "query" entry's clientip:clientport:id field. - id = int(chunks[2], 16) - words[i] += f" (id {id})" + packet_id = int(chunks[2], 16) + words[i] += f" (id {packet_id})" def decode_ip(words : list, i : int): r""" @@ -205,17 +210,17 @@ def decode_type(words : list, i : int): """ qt = words[i] - words[i] = query_type.get(int(qt), qt) + words[i] = QUERY_TYPE_NAME[int(qt)] def handle_dnscache_log(line : str) -> Optional[str]: r""" - Handle a single log line if it matches the ``dnscache_log_re`` regex. + Handle a single log line if it matches the ``DNSCACHE_LOG_RE`` regex. Parameters ---------- line : string - The log line that might match ``dnscache_log_re``. + The log line that might match ``DNSCACHE_LOG_RE``. Returns ------- @@ -251,7 +256,7 @@ def handle_dnscache_log(line : str) -> Optional[str]: >>> handle_dnscache_log(line) """ - match = dnscache_log_re.match(line) + match = DNSCACHE_LOG_RE.match(line) if not match: return None @@ -287,23 +292,24 @@ def handle_dnscache_log(line : str) -> Optional[str]: decode_ttl(words, 1) if words[2] not in ("cname", "mx", "ns", "ptr", "soa"): decode_type(words, 2) - if words[2] == "a": - # Decode the response to an 'A' query - decode_ip(words, 4) - if words[2] == "txt": - # Decode the TXT record's data from hex to ASCII. - response = words[4] - if response.endswith("..."): - ellipsis = "..." - response = response[0:-3] - else: - ellipsis = "" - length = int(response[0:2], 16) - chars = [] - for i in range(1, len(response)//2): - chars.append(chr(int(response[2*i : (2*i)+2], 16))) - txt = "".join(chars) - words[4] = f"{length}:\"{txt}{ellipsis}\"" + + if words[2] == "a": + # Decode the response to an 'A' query + decode_ip(words, 4) + if words[2] == "txt": + # Decode the TXT record's data from hex to ASCII. + response = words[4] + if response.endswith("..."): + ellipsis = "..." + response = response[0:-3] + else: + ellipsis = "" + length = int(response[0:2], 16) + chars = [] + for i in range(1, len(response)//2): + chars.append(chr(int(response[2*i : (2*i)+2], 16))) + txt = "".join(chars) + words[4] = f"{length}:\"{txt}{ellipsis}\"" elif event == "sent": decode_serial(words, 0)