]> gitweb.michael.orlitzky.com - djbdns-logparse.git/blobdiff - djbdns/dnscache.py
djbdns/dnscache.py: de-indent a block that doesn't need to be guarded.
[djbdns-logparse.git] / djbdns / dnscache.py
index ff1e4da66490aa27187710fe92168fb641259b31..3dd6d5f4ac6282bd6aff3aa6610d4a84ef797715 100644 (file)
@@ -1,9 +1,14 @@
-from re import compile
+r"""
+Functions and data specific to dnscache logs.
+"""
+# Don't clobber the global compile() with a named import.
+import re
+
 from typing import Optional
-from djbdns.common import *
+from djbdns.common import QUERY_TYPE_NAME, TIMESTAMP_PAT, convert_ip
 
 # The regex to match dnscache log lines.
-dnscache_log_re = compile(fr'({timestamp_pat}) (\w+)(.*)')
+DNSCACHE_LOG_RE = re.compile(fr'({TIMESTAMP_PAT}) (\w+)(.*)')
 
 
 def decode_client(words : list, i : int):
@@ -57,8 +62,8 @@ def decode_client(words : list, i : int):
 
     if len(chunks) == 3:
         # For a "query" entry's clientip:clientport:id field.
-        id = int(chunks[2], 16)
-        words[i] += f" (id {id})"
+        packet_id = int(chunks[2], 16)
+        words[i] += f" (id {packet_id})"
 
 def decode_ip(words : list, i : int):
     r"""
@@ -171,18 +176,51 @@ def decode_serial(words : list, i : int):
     words[i] = f"#{words[i]}"
 
 def decode_type(words : list, i : int):
+    r"""
+    Helper function to decode the type field in a dnscache log
+    entry.
+
+    A single "type" field is present in cached, nodata, query, rr, and
+    tx entries. Unlike with tinydns entries, dnscache logs have
+    this field already in decimal, so we just look up the
+    corresponding name in the query type map.
+
+    Parameters
+    ----------
+
+    words : list
+        A list with the "type" string at index ``i``
+
+    i : int
+        The index of the type field within ``words``
+
+    Returns
+    -------
+
+    Nothing; the ``i``th entry in the ``words`` list is modified
+    in-place.
+
+    Examples
+    --------
+
+        >>> words = ["2", "7f000001:b848:0f0b", "16", "example.com."]
+        >>> decode_type(words, 2)
+        >>> words
+        ['2', '7f000001:b848:0f0b', 'txt', 'example.com.']
+
+    """
     qt = words[i]
-    words[i] = query_type.get(int(qt), qt)
+    words[i] = QUERY_TYPE_NAME.get(int(qt), qt)
 
 def handle_dnscache_log(line : str) -> Optional[str]:
     r"""
-    Handle a single log line if it matches the ``dnscache_log_re`` regex.
+    Handle a single log line if it matches the ``DNSCACHE_LOG_RE`` regex.
 
     Parameters
     ----------
 
     line : string
-        The log line that might match ``dnscache_log_re``.
+        The log line that might match ``DNSCACHE_LOG_RE``.
 
     Returns
     -------
@@ -218,7 +256,7 @@ def handle_dnscache_log(line : str) -> Optional[str]:
         >>> handle_dnscache_log(line)
 
     """
-    match = dnscache_log_re.match(line)
+    match = DNSCACHE_LOG_RE.match(line)
     if not match:
         return None
 
@@ -254,21 +292,24 @@ def handle_dnscache_log(line : str) -> Optional[str]:
         decode_ttl(words, 1)
         if words[2] not in ("cname", "mx", "ns", "ptr", "soa"):
             decode_type(words, 2)
-            if words[2] == "a":         # decode answer to an A query
-                decode_ip(words, 4)
-            if words[2] == "txt":       # text record
-                response = words[4]
-                if response.endswith("..."):
-                    ellipsis = "..."
-                    response = response[0:-3]
-                else:
-                    ellipsis = ""
-                length = int(response[0:2], 16)
-                chars = []
-                for i in range(1, len(response)//2):
-                    chars.append(chr(int(response[2*i : (2*i)+2], 16)))
-                txt = "".join(chars)
-                words[4] = f"{length}:\"{txt}{ellipsis}\""
+
+        if words[2] == "a":
+            # Decode the response to an 'A' query
+            decode_ip(words, 4)
+        if words[2] == "txt":
+            # Decode the TXT record's data from hex to ASCII.
+            response = words[4]
+            if response.endswith("..."):
+                ellipsis = "..."
+                response = response[0:-3]
+            else:
+                ellipsis = ""
+            length = int(response[0:2], 16)
+            chars = []
+            for i in range(1, len(response)//2):
+                chars.append(chr(int(response[2*i : (2*i)+2], 16)))
+            txt = "".join(chars)
+            words[4] = f"{length}:\"{txt}{ellipsis}\""
 
     elif event == "sent":
         decode_serial(words, 0)