unbound-dhcp-leases-bridge: Implement a worker thread to handle all events

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
This commit is contained in:
Michael Tremer
2024-05-10 17:01:50 +01:00
parent d9348a16f1
commit 084795163e

View File

@@ -36,6 +36,7 @@ import stat
import subprocess
import sys
import tempfile
import threading
import time
LOCAL_TTL = 60
@@ -91,6 +92,9 @@ class UnboundDHCPLeasesBridge(object):
# Create a queue for all received events
self.queue = queue.Queue()
# Initialize the worker
self.worker = Worker(self.queue, callback=self._handle_message)
self.unbound = UnboundConfigWriter(unbound_leases_file)
# Load all required data
@@ -99,6 +103,9 @@ class UnboundDHCPLeasesBridge(object):
def run(self):
log.info("Unbound DHCP Leases Bridge started on %s" % self.leases_file)
# Launch the worker
self.worker.start()
# Open the server socket
self.socket = self._open_socket(self.socket_path)
@@ -119,11 +126,6 @@ class UnboundDHCPLeasesBridge(object):
# Decode the data
message = self._decode_message(data)
# Log the received message
log.debug("Received message:")
for key in message:
log.debug(" %-20s = %s" % (key, message[key]))
# Add the message to the queue
self.queue.put(message)
@@ -140,6 +142,10 @@ class UnboundDHCPLeasesBridge(object):
finally:
conn.close()
# Terminate the worker
self.queue.put(None)
self.worker.join()
log.info("Unbound DHCP Leases Bridge terminated")
def _open_socket(self, path):
@@ -193,6 +199,45 @@ class UnboundDHCPLeasesBridge(object):
return message
def _handle_message(self, message):
log.debug("Handling message:")
for key in message:
log.debug(" %-20s = %s" % (key, message[key]))
# Extract the event type
event = message.get("EVENT")
# Check if event is set
if not event:
raise ValueError("The message does not have EVENT set")
# COMMIT
elif event == "commit":
address = message.get("ADDRESS")
name = message.get("NAME")
# Create a new lease
lease = Lease(address, {
"client-hostname" : name,
})
# Apply the lease
self.unbound.apply_lease(lease)
# RELEASE/EXPIRY
elif event in ("release", "expiry"):
address = message.get("ADDRESS")
# Create a new lease
lease = Lease(address, {})
# Remove the lease
self.unbound.remove_lease(lease)
# Raise an error if the event is not supported
else:
raise ValueError("Unsupported event: %s" % event)
def update_dhcp_leases(self):
leases = []
@@ -276,6 +321,36 @@ class UnboundDHCPLeasesBridge(object):
self.socket.close()
class Worker(threading.Thread):
"""
The worker is launched in a separate thread
which allows us to perform some tasks asynchronously.
"""
def __init__(self, queue, callback):
super().__init__()
self.queue = queue
self.callback = callback
def run(self):
log.debug("Worker %s launched" % self.native_id)
while True:
message = self.queue.get()
# If the message is None, we have to quit
if message is None:
break
# Call the callback
try:
self.callback(message)
except Exception as e:
log.error("Callback failed: %s" % e)
log.debug("Worker %s terminated" % self.native_id)
class DHCPLeases(object):
regex_leaseblock = re.compile(r"lease (?P<ipaddr>\d+\.\d+\.\d+\.\d+) {(?P<config>[\s\S]+?)\n}")
@@ -613,6 +688,9 @@ class UnboundConfigWriter(object):
command = ["unbound-control"]
command.extend(args)
# Log what we are doing
log.debug("Running %s" % " ".join(command))
try:
subprocess.check_output(command)
@@ -623,6 +701,28 @@ class UnboundConfigWriter(object):
raise e
def apply_lease(self, lease):
"""
This method takes a lease and updates Unbound at runtime.
"""
log.debug("Applying lease %s" % lease)
for rr in lease.rrset:
log.debug("Adding new record %s" % " ".join(rr))
self._control("local_data", *rr)
def remove_lease(self, lease):
"""
This method takes a lease and removes it from Unbound at runtime.
"""
log.debug("Removing lease %s" % lease)
for name, ttl, type, content in lease.rrset:
log.debug("Removing records for %s" % name)
self._control("local_data_remove", name)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Bridge for DHCP Leases and Unbound DNS")