Files
bpfire/config/ovpn/openvpn-authenticator
Michael Tremer c9dc7fdec0 openvpn-authenticator: Always return general connection data
The function returned different output when TOTP was configured and not
which is not what it should do.

This version will now try to add the TOTP configuration, or will add
nothing it if fails to do so.

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
2022-06-17 10:20:19 +00:00

382 lines
9.7 KiB
Python

#!/usr/bin/python3
###############################################################################
# #
# IPFire.org - A linux based firewall #
# Copyright (C) 2022 Michael Tremer #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <http://www.gnu.org/licenses/>. #
# #
###############################################################################
import argparse
import base64
import csv
import daemon
import logging
import logging.handlers
import signal
import socket
import subprocess
import sys
OPENVPN_CONFIG = "/var/ipfire/ovpn/ovpnconfig"
CHALLENGETEXT = "One Time Token: "
log = logging.getLogger()
log.setLevel(logging.DEBUG)
def setup_logging(daemon=True, loglevel=logging.INFO):
log.setLevel(loglevel)
# Log to syslog by default
handler = logging.handlers.SysLogHandler(address="/dev/log", facility="daemon")
log.addHandler(handler)
# Format everything
formatter = logging.Formatter("%(name)s[%(process)d]: %(message)s")
handler.setFormatter(formatter)
handler.setLevel(loglevel)
# If we are running in foreground, we should write everything to the console, too
if not daemon:
handler = logging.StreamHandler()
log.addHandler(handler)
handler.setLevel(loglevel)
return log
class OpenVPNAuthenticator(object):
def __init__(self, socket_path):
self.socket_path = socket_path
def _read_line(self):
buf = []
while True:
char = self.sock.recv(1)
buf.append(char)
# Reached end of line
if char == b"\n":
break
line = b"".join(buf).decode()
line = line.rstrip()
log.debug("< %s" % line)
return line
def _write_line(self, line):
log.debug("> %s" % line)
if not line.endswith("\n"):
line = "%s\n" % line
# Convert into bytes
buf = line.encode()
# Send to socket
self.sock.send(buf)
def _send_command(self, command):
# Send the command
self._write_line(command)
return # XXX Code below doesn't work
# Read response
response = self._read_line()
# Handle response
if not response.startswith("SUCCESS:"):
log.error("Command '%s' returned an error:" % command)
log.error(" %s" % response)
return response
def run(self):
# Connect to socket
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.sock.connect(self.socket_path)
log.info("OpenVPN Authenticator started")
while True:
line = self._read_line()
if line.startswith(">CLIENT"):
self._client_event(line)
log.info("OpenVPN Authenticator terminated")
def terminate(self, *args):
# XXX TODO
raise SystemExit
def _client_event(self, line):
# Strip away "CLIENT:"
client, delim, line = line.partition(":")
# Extract the event & split any arguments
event, delim, arguments = line.partition(",")
arguments = arguments.split(",")
environ = {}
if event == "CONNECT":
environ = self._read_env(environ)
self._client_connect(*arguments, environ=environ)
elif event == "DISCONNECT":
environ = self._read_env(environ)
self._client_disconnect(*arguments, environ=environ)
elif event == "REAUTH":
environ = self._read_env(environ)
self._client_reauth(*arguments, environ=environ)
elif event == "ESTABLISHED":
environ = self._read_env(environ)
else:
log.debug("Unhandled event: %s" % event)
def _read_env(self, environ):
# Read environment
while True:
line = self._read_line()
if not line.startswith(">CLIENT:ENV,"):
raise RuntimeError("Unexpected environment line: %s" % line)
# Strip >CLIENT:ENV,
line = line[12:]
# Done
if line == "END":
break
# Parse environment
key, delim, value = line.partition("=")
environ[key] = value
return environ
def _client_connect(self, cid, kid, environ={}):
log.debug("Received client connect (cid=%s, kid=%s)" % (cid, kid))
for key in sorted(environ):
log.debug(" %s : %s" % (key, environ[key]))
# Fetch common name
common_name = environ.get("common_name")
# Find connection details
conn = self._find_connection(common_name)
if not conn:
log.warning("Could not find connection '%s'" % common_name)
# XXX deny auth?
log.debug("Found connection:")
for key in conn:
log.debug(" %s : %s" % (key, conn[key]))
# Perform no further checks if TOTP is disabled for this client
if not conn.get("totp_status") == "on":
return self._client_auth_successful(cid, kid)
# Fetch username & password
username = environ.get("username")
password = environ.get("password")
# Client sent the special password TOTP to start challenge authentication
if password == "TOTP":
return self._client_auth_challenge(cid, kid,
username=common_name, password="TOTP")
elif password.startswith("CRV1:"):
log.debug("Received dynamic challenge response %s" % password)
# Decode the string
(command, flags, username, password, token) = password.split(":", 5)
# Decode username
username = self._b64decode(username)
# Check if username matches common name
if username == common_name:
# Check if TOTP token matches
if self._check_totp_token(token, conn.get("totp_secret")):
return self._client_auth_successful(cid, kid)
# Restart authentication
self._client_auth_challenge(cid, kid,
username=common_name, password="TOTP")
def _client_disconnect(self, cid, environ={}):
"""
Handles CLIENT:DISCONNECT events
"""
pass
def _client_reauth(self, cid, kid, environ={}):
"""
Handles CLIENT:REAUTH events
"""
# Perform no checks
self._client_auth_successful(cid, kid)
def _client_auth_challenge(self, cid, kid, username, password):
"""
Initiates a dynamic challenge authentication with the client
"""
log.debug("Sending request for dynamic challenge...")
self._send_command(
"client-deny %s %s \"CRV1\" \"CRV1:R,E:%s:%s:%s\"" % (
cid,
kid,
self._b64encode(username),
self._b64encode(password),
self._escape(CHALLENGETEXT),
),
)
def _client_auth_successful(self, cid, kid):
"""
Sends a positive authentication response
"""
log.debug("Client Authentication Successful (cid=%s, kid=%s)" % (cid, kid))
self._send_command(
"client-auth-nt %s %s" % (cid, kid),
)
@staticmethod
def _b64encode(s):
return base64.b64encode(s.encode()).decode()
@staticmethod
def _b64decode(s):
return base64.b64decode(s.encode()).decode()
@staticmethod
def _escape(s):
return s.replace(" ", "\ ")
def _find_connection(self, common_name):
with open(OPENVPN_CONFIG, "r") as f:
for row in csv.reader(f, dialect="unix"):
# Skip empty rows or rows that are too short
if not row or len(row) < 5:
continue
# Skip disabled connections
if not row[1] == "on":
continue
# Skip any net-2-net connections
if not row[4] == "host":
continue
# Skip if common name does not match
if not row[3] == common_name:
continue
# Return match!
conn = {
"name" : row[2],
"common_name" : row[3],
}
# TOTP options
try:
conn |= {
"totp_protocol" : row[43],
"totp_status" : row[44],
"totp_secret" : row[45],
}
except IndexError:
pass
return conn
def _check_totp_token(self, token, secret):
p = subprocess.run(
["oathtool", "--totp", "-w", "3", "%s" % secret],
capture_output=True,
)
# Catch any errors if we could not run the command
if p.returncode:
log.error("Could not run oathtool: %s" % p.stderr)
return False
# Reading returned tokens looking for a match
for line in p.stdout.split(b"\n"):
# Skip empty/last line(s)
if not line:
continue
# Decode bytes into string
line = line.decode()
# Return True if a token matches
if line == token:
return True
# No match
return False
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="OpenVPN Authenticator")
# Daemon Stuff
parser.add_argument("--daemon", "-d", action="store_true",
help="Launch as daemon in background")
parser.add_argument("--verbose", "-v", action="count", help="Be more verbose")
# Paths
parser.add_argument("--socket", default="/var/run/openvpn.sock",
metavar="PATH", help="Path to OpenVPN Management Socket")
# Parse command line arguments
args = parser.parse_args()
# Setup logging
loglevel = logging.WARN
if args.verbose:
if args.verbose == 1:
loglevel = logging.INFO
elif args.verbose >= 2:
loglevel = logging.DEBUG
# Create an authenticator
authenticator = OpenVPNAuthenticator(args.socket)
with daemon.DaemonContext(
detach_process=args.daemon,
stderr=None if args.daemon else sys.stderr,
signal_map = {
signal.SIGINT : authenticator.terminate,
signal.SIGTERM : authenticator.terminate,
},
) as daemon:
setup_logging(daemon=args.daemon, loglevel=loglevel)
authenticator.run()