Instead of using True/False to signal success after possibly using
print() to actually write the result, we not just return either the
processed string, or None.
import re, typing
from struct import pack
import re, typing
from struct import pack
-from time import strftime, gmtime
from subprocess import Popen, PIPE
from subprocess import Popen, PIPE
+from time import strftime, gmtime
## Regular expressions for matching tinydns/dnscache log lines. We
## Regular expressions for matching tinydns/dnscache log lines. We
qt = words[i]
words[i] = query_type.get(int(qt), qt)
qt = words[i]
words[i] = query_type.get(int(qt), qt)
-def handle_dnscache_log(line) -> bool:
+def handle_dnscache_log(line) -> typing.Optional[str]:
"""
Handle a single log line if it matches the ``dnscache_log_re`` regex.
"""
Handle a single log line if it matches the ``dnscache_log_re`` regex.
- ``True`` if the log line was handled (that is, if it was really a
- dnscache log line), and ``False`` otherwise.
+ Either the human-readable string if the log line was handled (that
+ is, if it was really a dnscache log line), or ``None`` if it was
+ not.
Examples
--------
>>> line = "2022-09-15 18:37:33.863805500 query 1 7f000001:a3db:4fb9 1 www.example.com."
>>> handle_dnscache_log(line)
Examples
--------
>>> line = "2022-09-15 18:37:33.863805500 query 1 7f000001:a3db:4fb9 1 www.example.com."
>>> handle_dnscache_log(line)
- 2022-09-15 18:37:33.863805500 query #1 127.0.0.1:41947 (id 20409) a www.example.com.
- True
+ '2022-09-15 18:37:33.863805500 query #1 127.0.0.1:41947 (id 20409) a www.example.com.'
>>> line = "2022-09-15 18:37:33.863874500 tx 0 1 www.example.com. . c0a80101"
>>> handle_dnscache_log(line)
>>> line = "2022-09-15 18:37:33.863874500 tx 0 1 www.example.com. . c0a80101"
>>> handle_dnscache_log(line)
- 2022-09-15 18:37:33.863874500 tx g=0 a www.example.com. . 192.168.1.1
- True
+ '2022-09-15 18:37:33.863874500 tx g=0 a www.example.com. . 192.168.1.1'
>>> line = "2022-09-15 18:37:33.878529500 rr c0a80101 20865 1 www.example.com. 5db8d822"
>>> handle_dnscache_log(line)
>>> line = "2022-09-15 18:37:33.878529500 rr c0a80101 20865 1 www.example.com. 5db8d822"
>>> handle_dnscache_log(line)
- 2022-09-15 18:37:33.878529500 rr 192.168.1.1 TTL=20865 a www.example.com. 93.184.216.34
- True
+ '2022-09-15 18:37:33.878529500 rr 192.168.1.1 TTL=20865 a www.example.com. 93.184.216.34'
>>> line = "2022-09-15 18:37:33.878532500 stats 1 43 1 0"
>>> handle_dnscache_log(line)
>>> line = "2022-09-15 18:37:33.878532500 stats 1 43 1 0"
>>> handle_dnscache_log(line)
- 2022-09-15 18:37:33.878532500 stats count=1 motion=43 udp-active=1 tcp-active=0
- True
+ '2022-09-15 18:37:33.878532500 stats count=1 motion=43 udp-active=1 tcp-active=0'
>>> line = "2022-09-15 18:37:33.878602500 sent 1 49"
>>> handle_dnscache_log(line)
>>> line = "2022-09-15 18:37:33.878602500 sent 1 49"
>>> handle_dnscache_log(line)
- 2022-09-15 18:37:33.878602500 sent #1 49
- True
+ '2022-09-15 18:37:33.878602500 sent #1 49'
>>> line = "this line is nonsense"
>>> handle_dnscache_log(line)
>>> line = "this line is nonsense"
>>> handle_dnscache_log(line)
"""
match = dnscache_log_re.match(line)
if not match:
"""
match = dnscache_log_re.match(line)
if not match:
(timestamp, event, data) = match.groups()
(timestamp, event, data) = match.groups()
elif event in ("tcpopen", "tcpclose"):
decode_client(words, 0)
elif event in ("tcpopen", "tcpclose"):
decode_client(words, 0)
- print(timestamp, event, " ".join(words))
- return True
+ return f"{timestamp} {event} " + " ".join(words)
+
-def handle_tinydns_log(line : str) -> bool:
+def handle_tinydns_log(line : str) -> typing.Optional[str]:
"""
Handle a single log line if it matches the ``tinydns_log_re`` regex.
"""
Handle a single log line if it matches the ``tinydns_log_re`` regex.
- ``True`` if the log line was handled (that is, if it was really a
- tinydns log line), and ``False`` otherwise.
+ Either the human-readable string if the log line was handled (that
+ is, if it was really a tinydns log line), or ``None`` if it was
+ not.
Examples
--------
>>> line = "2022-09-14 21:04:40.206516500 7f000001:9d61:be69 - 0001 www.example.com"
>>> handle_tinydns_log(line)
Examples
--------
>>> line = "2022-09-14 21:04:40.206516500 7f000001:9d61:be69 - 0001 www.example.com"
>>> handle_tinydns_log(line)
- 2022-09-14 21:04:40.206516500 dropped query (no authority) from 127.0.0.1:40289 (id 48745): a www.example.com
- True
+ '2022-09-14 21:04:40.206516500 dropped query (no authority) from 127.0.0.1:40289 (id 48745): a www.example.com'
>>> line = "this line is nonsense"
>>> handle_tinydns_log(line)
>>> line = "this line is nonsense"
>>> handle_tinydns_log(line)
"""
match = tinydns_log_re.match(line)
if not match:
"""
match = tinydns_log_re.match(line)
if not match:
(timestamp, ip, port, id, code, type, name) = match.groups()
ip = convert_ip(ip)
(timestamp, ip, port, id, code, type, name) = match.groups()
ip = convert_ip(ip)
type = int(type, 16) # "001c" -> 28
type = query_type.get(type, type) # 28 -> "aaaa"
type = int(type, 16) # "001c" -> 28
type = query_type.get(type, type) # 28 -> "aaaa"
- print(timestamp, end=' ')
+ line_tpl = "{timestamp} "
reason = query_drop_reason[code]
if code == "+":
reason = query_drop_reason[code]
if code == "+":
- line_tpl = "sent response to {ip}:{port} (id {id}): {type} {name}"
+ line_tpl += "sent response to {ip}:{port} (id {id}): {type} {name}"
- line_tpl = "dropped query ({reason}) from {ip}:{port}"
+ line_tpl += "dropped query ({reason}) from {ip}:{port}"
if code != "/":
# If the query can actually be parsed, the log line is a
# bit more informative than it would have been otherwise.
line_tpl += " (id {id}): {type} {name}"
if code != "/":
# If the query can actually be parsed, the log line is a
# bit more informative than it would have been otherwise.
line_tpl += " (id {id}): {type} {name}"
- print(line_tpl.format(reason=reason,
- ip=ip,
- port=port,
- id=id,
- type=type,
- name=name))
- return True
+ return line_tpl.format(timestamp=timestamp,
+ reason=reason,
+ ip=ip,
+ port=port,
+ id=id,
+ type=type,
+ name=name)
def parse_logfile(file : typing.TextIO):
def parse_logfile(file : typing.TextIO):
tai.stdin.write(line)
line = tai.stdout.readline()
tai.stdin.write(line)
line = tai.stdout.readline()
- if not handle_tinydns_log(line):
- if not handle_dnscache_log(line):
- print(line, end='')
+ friendly_line = handle_tinydns_log(line)
+ if not friendly_line:
+ friendly_line = handle_dnscache_log(line)
+ if not friendly_line:
+ friendly_line = line
+
+ print(friendly_line)