-from re import compile
+r"""
+Functions and data specific to dnscache logs.
+"""
+# Don't clobber the global compile() with a named import.
+import re
+
from typing import Optional
-from djbdns.common import *
+from djbdns.common import QUERY_TYPE_NAME, TIMESTAMP_PAT, convert_ip
# The regex to match dnscache log lines.
-dnscache_log_re = compile(fr'({timestamp_pat}) (\w+)(.*)')
+DNSCACHE_LOG_RE = re.compile(fr'({TIMESTAMP_PAT}) (\w+)(.*)')
def decode_client(words : list, i : int):
if len(chunks) == 3:
# For a "query" entry's clientip:clientport:id field.
- id = int(chunks[2], 16)
- words[i] += f" (id {id})"
+ packet_id = int(chunks[2], 16)
+ words[i] += f" (id {packet_id})"
def decode_ip(words : list, i : int):
r"""
words[i] = f"#{words[i]}"
def decode_type(words : list, i : int):
+ r"""
+ Helper function to decode the type field in a dnscache log
+ entry.
+
+ A single "type" field is present in cached, nodata, query, rr, and
+ tx entries. Unlike with tinydns entries, dnscache logs have
+ this field already in decimal, so we just look up the
+ corresponding name in the query type map.
+
+ Parameters
+ ----------
+
+ words : list
+ A list with the "type" string at index ``i``
+
+ i : int
+ The index of the type field within ``words``
+
+ Returns
+ -------
+
+ Nothing; the ``i``th entry in the ``words`` list is modified
+ in-place.
+
+ Examples
+ --------
+
+ >>> words = ["2", "7f000001:b848:0f0b", "16", "example.com."]
+ >>> decode_type(words, 2)
+ >>> words
+ ['2', '7f000001:b848:0f0b', 'txt', 'example.com.']
+
+ """
qt = words[i]
- words[i] = query_type.get(int(qt), qt)
+ words[i] = QUERY_TYPE_NAME[int(qt)]
def handle_dnscache_log(line : str) -> Optional[str]:
r"""
- Handle a single log line if it matches the ``dnscache_log_re`` regex.
+ Handle a single log line if it matches the ``DNSCACHE_LOG_RE`` regex.
Parameters
----------
line : string
- The log line that might match ``dnscache_log_re``.
+ The log line that might match ``DNSCACHE_LOG_RE``.
Returns
-------
>>> handle_dnscache_log(line)
"""
- match = dnscache_log_re.match(line)
+ match = DNSCACHE_LOG_RE.match(line)
if not match:
return None
decode_ttl(words, 1)
if words[2] not in ("cname", "mx", "ns", "ptr", "soa"):
decode_type(words, 2)
- if words[2] == "a": # decode answer to an A query
- decode_ip(words, 4)
- if words[2] == "txt": # text record
- response = words[4]
- if response.endswith("..."):
- ellipsis = "..."
- response = response[0:-3]
- else:
- ellipsis = ""
- length = int(response[0:2], 16)
- chars = []
- for i in range(1, len(response)//2):
- chars.append(chr(int(response[2*i : (2*i)+2], 16)))
- txt = "".join(chars)
- words[4] = f"{length}:\"{txt}{ellipsis}\""
+
+ if words[2] == "a":
+ # Decode the response to an 'A' query
+ decode_ip(words, 4)
+ if words[2] == "txt":
+ # Decode the TXT record's data from hex to ASCII.
+ response = words[4]
+ if response.endswith("..."):
+ ellipsis = "..."
+ response = response[0:-3]
+ else:
+ ellipsis = ""
+ length = int(response[0:2], 16)
+ chars = []
+ for i in range(1, len(response)//2):
+ chars.append(chr(int(response[2*i : (2*i)+2], 16)))
+ txt = "".join(chars)
+ words[4] = f"{length}:\"{txt}{ellipsis}\""
elif event == "sent":
decode_serial(words, 0)