#!/usr/bin/python3 r""" Simple python replacement for the MaxMind geoipupdate program. """ # stdlib imports from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser import gzip import hashlib import os from pathlib import Path import shutil import sys import tempfile import tomllib # external imports import requests def main(): r""" The entry point of the geoipyupdate script. """ # Create an argument parser using our docsctring as its description. parser = ArgumentParser(description=sys.modules[__name__].__doc__, formatter_class=ArgumentDefaultsHelpFormatter) # XDG_CONFIG_HOME defaults to ~/.config default_xdgch = str(Path.home() / ".config") xdgch = os.environ.get("XDG_CONFIG_HOME", default_xdgch) default_config_file = os.path.join(xdgch, "geoipyupdate", "geoipyupdate.toml") parser.add_argument('-c', '--config-file', default=default_config_file, help='path to configuration file') args = parser.parse_args() # Load/parse the config with open(args.config_file, "rb") as f: config = tomllib.load(f) editions = config["database"]["editions"] datadir = config["database"]["datadir"] account_id = config["account"]["account_id"] license_key = config["account"]["license_key"] # Impersonate the true client headers = {'User-Agent': 'geoipupdate/6.1.0'} # This never changes url_server = "https://updates.maxmind.com" for edition in editions: # The final location of this database, which also happens to # be where the previous database might be found. dbfile = os.path.join(datadir, f"{edition}.mmdb") # Compute the hash of the old database, if there is # one. Otherwise, leave it blank. This is passed to the server # who might tell us that the database was 304 Not Modified. oldhash = "" if os.path.isfile(dbfile): with open(dbfile, 'rb') as f: oldhash = hashlib.md5(f.read()).hexdigest() url_path = f"/geoip/databases/{edition}/update?db_md5={oldhash}" url = f"{url_server}{url_path}" r = requests.get(url, auth=(account_id, license_key), headers=headers, stream=True, timeout=60) if r.status_code == 304: # The database hasn't changed since we last downloaded it. continue r.raise_for_status() # Insist on md5 verification of the downloads, i.e. don't handle # the case where the md5 response header is missing. xdbmd5 = r.headers["X-Database-MD5"] # First download the gzipped file to /tmp or wherever. When # python-3.12 is more widespread, delete_on_close=False might # be a better alternative, allowing us to use a context # manager here. f = tempfile.NamedTemporaryFile(delete=False) for chunk in r.iter_content(chunk_size=128): f.write(chunk) f.close() # Now gunzip it to a new temporary file (can't simply gunzip in # place, because now the name would be predictable). We need # delete=False here because we intend to move this file to the # datadir. g = tempfile.NamedTemporaryFile(delete=False) gdata = gzip.open(f.name, 'rb').read() # We're done with f. Remove it ASAP in case something goes # wrong. os.unlink(f.name) newhash = hashlib.md5(gdata).hexdigest() if newhash == xdbmd5: g.write(gdata) g.close() else: raise ValueError( f"{edition} hash doesn't match X-Database-MD5 header" ) # Overwrite the old database file with the new (gunzipped) one. shutil.move(g.name, dbfile)