import re, typing
from struct import pack
-from subprocess import Popen, PIPE
from time import strftime, gmtime
}
-def convert_ip(ip : str):
+def convert_ip(ip : str) -> str:
"""
- Convert a hex string representing an IP address to conventional
- human-readable form, ie. dotted-quad decimal for IPv4, and
- 8 colon-separated hex shorts for IPv6.
+ Convert a hex string representing an IP address to
+ human-readable form.
+
+ Parameters
+ ----------
+
+ ip : str
+ The hexadecimal representation of either an IPv4 or an IPv6
+ address.
+
+ Returns
+ -------
+
+ The usual decimal dotted-quad representation is returned for an
+ IPv4 address. IPv6 addresses are returned almost as-is, but with
+ colons inserted in the appropriate places, between every four
+ characters.
Examples
--------
'127.0.0.1'
>>> convert_ip("00000000000000000000ffff7f000001")
'0000:0000:0000:0000:0000:ffff:7f00:0001'
-
"""
if len(ip) == 8:
# IPv4, eg. "7f000001" -> "7f 00 00 01" -> "127.0.0.1"
return ":".join([ip[(4*i) : (4*i+4)] for i in range(8)])
-def decode_client(words, i):
+def decode_client(words : list, i : int):
+ r"""
+ Helper function to decode the client field in a dnscache log
+ entry.
+
+ There are two possible formats for the client field,
+
+ 1. clientip:clientport, used by tcpopen/tcpclose entries,
+ 2. clientip:clientport:id, used by "query" entries.
+
+ Parameters
+ ----------
+
+ words : list
+ The ``words`` list (a list of fields) from
+ :func:`handle_dnscache_log`.
+
+ i : int
+ The index of the client field within ``words``
+
+ Returns
+ -------
+
+ Nothing; the ``i``th entry in the ``words`` list is modified
+ in-place.
+
+ Examples
+ --------
+
+ >>> words = ["foo", "bar", "7f000001:9253", "quux"]
+ >>> decode_client(words, 2)
+ >>> words
+ ['foo', 'bar', '127.0.0.1:37459', 'quux']
+
+ >>> words = ["foo", "7f000001:a3db:4fb9", "bar", "quux"]
+ >>> decode_client(words, 1)
+ >>> words
+ ['foo', '127.0.0.1:41947 (id 20409)', 'bar', 'quux']
+
+ """
chunks = words[i].split(":")
ip = convert_ip(chunks[0])
>>> remove(f.name)
"""
- # Open pipe to tai64nlocal: we will write lines of our input (the
- # raw log file) to it, and read log lines with readable timestamps
- # from it.
+ # Open a pipe to tai64nlocal. We'll write lines of our input file
+ # (the log file) to it, and read back the same lines but with
+ # friendly timestamps in them.
+ from subprocess import Popen, PIPE
tai = Popen(["tai64nlocal"], stdin=PIPE, stdout=PIPE, text=True, bufsize=0)
for line in file: