import re, typing
from struct import pack
-from subprocess import Popen, PIPE
from time import strftime, gmtime
}
-def convert_ip(ip : str):
+def convert_ip(ip : str) -> str:
"""
- Convert a hex string representing an IP address to conventional
- human-readable form, ie. dotted-quad decimal for IPv4, and
- 8 colon-separated hex shorts for IPv6.
+ Convert a hex string representing an IP address to
+ human-readable form.
+
+ Parameters
+ ----------
+
+ ip : str
+ The hexadecimal representation of either an IPv4 or an IPv6
+ address.
+
+ Returns
+ -------
+
+ The usual decimal dotted-quad representation is returned for an
+ IPv4 address. IPv6 addresses are returned almost as-is, but with
+ colons inserted in the appropriate places, between every four
+ characters.
Examples
--------
'127.0.0.1'
>>> convert_ip("00000000000000000000ffff7f000001")
'0000:0000:0000:0000:0000:ffff:7f00:0001'
-
"""
if len(ip) == 8:
# IPv4, eg. "7f000001" -> "7f 00 00 01" -> "127.0.0.1"
- return "%d.%d.%d.%d" % tuple(pack(">L", int(ip, 16)))
+ return ".".join(map(str, pack(">L", int(ip, 16))))
elif len(ip) == 32:
# IPv6 is actually simpler -- it's just a string-slicing operation.
return ":".join([ip[(4*i) : (4*i+4)] for i in range(8)])
-def decode_client(words, i):
+def decode_client(words : list, i : int):
+ r"""
+ Helper function to decode the client field in a dnscache log
+ entry.
+
+ There are two possible formats for the client field,
+
+ 1. clientip:clientport, used by tcpopen/tcpclose entries,
+ 2. clientip:clientport:id, used by "query" entries.
+
+ Parameters
+ ----------
+
+ words : list
+ The ``words`` list (a list of fields) from
+ :func:`handle_dnscache_log`.
+
+ i : int
+ The index of the client field within ``words``
+
+ Returns
+ -------
+
+ Nothing; the ``i``th entry in the ``words`` list is modified
+ in-place.
+
+ Examples
+ --------
+
+ >>> words = ["foo", "bar", "7f000001:9253", "quux"]
+ >>> decode_client(words, 2)
+ >>> words
+ ['foo', 'bar', '127.0.0.1:37459', 'quux']
+
+ >>> words = ["foo", "7f000001:a3db:4fb9", "bar", "quux"]
+ >>> decode_client(words, 1)
+ >>> words
+ ['foo', '127.0.0.1:41947 (id 20409)', 'bar', 'quux']
+
+ """
chunks = words[i].split(":")
- if len(chunks) == 2: # ip:port
- words[i] = "%s:%d" % (convert_ip(chunks[0]), int(chunks[1], 16))
- elif len(chunks) == 3:
- words[i] = "%s:%d (id %d)" % (convert_ip(chunks[0]),
- int(chunks[1], 16),
- int(chunks[2], 16))
+
+ ip = convert_ip(chunks[0])
+ port = int(chunks[1], 16)
+ words[i] = f"{ip}:{port}"
+
+ if len(chunks) == 3:
+ # For a "query" entry's clientip:clientport:id field.
+ id = int(chunks[2], 16)
+ words[i] += f" (id {id})"
def decode_ip(words, i):
words[i] = convert_ip(words[i])
def decode_ttl(words, i):
- words[i] = "TTL=%s" % words[i]
+ words[i] = f"TTL={words[i]}"
def decode_serial(words, i):
serial = int(words[i])
- words[i] = "#%d" % serial
+ words[i] = f"#{serial}"
def decode_type(words, i):
qt = words[i]
ellipsis = ""
length = int(response[0:2], 16)
chars = []
- for i in range(1, len(response)/2):
+ for i in range(1, len(response)//2):
chars.append(chr(int(response[2*i : (2*i)+2], 16)))
- words[4] = "%d:\"%s%s\"" % (length, "".join(chars), ellipsis)
+ txt = "".join(chars)
+ words[4] = f"{length}:\"{txt}{ellipsis}\""
elif event == "sent":
decode_serial(words, 0)
elif event == "stats":
- words[0] = "count=%s" % words[0]
- words[1] = "motion=%s" % words[1]
- words[2] = "udp-active=%s" % words[2]
- words[3] = "tcp-active=%s" % words[3]
+ words[0] = f"count={words[0]}"
+ words[1] = f"motion={words[1]}"
+ words[2] = f"udp-active={words[2]}"
+ words[3] = f"tcp-active={words[3]}"
elif event == "tx":
- words[0] = "g=%s" % words[0]
+ words[0] = f"g={words[0]}"
decode_type(words, 1)
# words[2] = name
# words[3] = control (domain for which these servers are believed
>>> remove(f.name)
"""
- # Open pipe to tai64nlocal: we will write lines of our input (the
- # raw log file) to it, and read log lines with readable timestamps
- # from it.
+ # Open a pipe to tai64nlocal. We'll write lines of our input file
+ # (the log file) to it, and read back the same lines but with
+ # friendly timestamps in them.
+ from subprocess import Popen, PIPE
tai = Popen(["tai64nlocal"], stdin=PIPE, stdout=PIPE, text=True, bufsize=0)
for line in file: