mirror of
https://gitlab.archlinux.org/archlinux/aurweb.git
synced 2025-02-03 10:43:03 +01:00
aurweb.asgi: add security headers middleware
This commit introduces a middleware function which adds the following security headers to each response: - Content-Security-Policy - This includes a new `nonce`, which is tied to a user via authentication middleware. Both an anonymous user and an authenticated user recieve their own random nonces. - X-Content-Type-Options - Referrer-Policy - X-Frame-Options They are then tested for existence in test/test_routes.py. Note: The overcomplicated-looking asyncio behavior in the middleware function is used to avoid a warning about the old coroutine awaits being deprecated. See https://docs.python.org/3/library/asyncio-task.html#asyncio.wait for more detail. Signed-off-by: Kevin Morris <kevr@0cost.org>
This commit is contained in:
parent
13456fea1e
commit
865c414504
6 changed files with 106 additions and 3 deletions
|
@ -1,6 +1,8 @@
|
|||
import asyncio
|
||||
import http
|
||||
import typing
|
||||
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from fastapi import FastAPI, HTTPException, Request
|
||||
from fastapi.responses import HTMLResponse
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from starlette.middleware.authentication import AuthenticationMiddleware
|
||||
|
@ -55,3 +57,42 @@ async def http_exception_handler(request, exc):
|
|||
phrase = http.HTTPStatus(exc.status_code).phrase
|
||||
return HTMLResponse(f"<h1>{exc.status_code} {phrase}</h1><p>{exc.detail}</p>",
|
||||
status_code=exc.status_code)
|
||||
|
||||
|
||||
@app.middleware("http")
|
||||
async def add_security_headers(request: Request, call_next: typing.Callable):
|
||||
""" This middleware adds the CSP, XCTO, XFO and RP security
|
||||
headers to the HTTP response associated with request.
|
||||
|
||||
CSP: Content-Security-Policy
|
||||
XCTO: X-Content-Type-Options
|
||||
RP: Referrer-Policy
|
||||
XFO: X-Frame-Options
|
||||
"""
|
||||
response = asyncio.create_task(call_next(request))
|
||||
await asyncio.wait({response}, return_when=asyncio.FIRST_COMPLETED)
|
||||
response = response.result()
|
||||
|
||||
# Add CSP header.
|
||||
nonce = request.user.nonce
|
||||
csp = "default-src 'self'; "
|
||||
script_hosts = [
|
||||
"ajax.googleapis.com",
|
||||
"cdn.jsdelivr.net"
|
||||
]
|
||||
csp += f"script-src 'self' 'nonce-{nonce}' " + ' '.join(script_hosts)
|
||||
response.headers["Content-Security-Policy"] = csp
|
||||
|
||||
# Add XTCO header.
|
||||
xcto = "nosniff"
|
||||
response.headers["X-Content-Type-Options"] = xcto
|
||||
|
||||
# Add Referrer Policy header.
|
||||
rp = "same-origin"
|
||||
response.headers["Referrer-Policy"] = rp
|
||||
|
||||
# Add X-Frame-Options header.
|
||||
xfo = "SAMEORIGIN"
|
||||
response.headers["X-Frame-Options"] = xfo
|
||||
|
||||
return response
|
||||
|
|
|
@ -10,7 +10,7 @@ from starlette.requests import HTTPConnection
|
|||
|
||||
import aurweb.config
|
||||
|
||||
from aurweb import l10n
|
||||
from aurweb import l10n, util
|
||||
from aurweb.models.session import Session
|
||||
from aurweb.models.user import User
|
||||
from aurweb.templates import make_variable_context, render_template
|
||||
|
@ -25,6 +25,12 @@ class AnonymousUser:
|
|||
# A stub ssh_pub_key relationship.
|
||||
ssh_pub_key = None
|
||||
|
||||
# A nonce attribute, needed for all browser sessions; set in __init__.
|
||||
nonce = None
|
||||
|
||||
def __init__(self):
|
||||
self.nonce = util.make_nonce()
|
||||
|
||||
@staticmethod
|
||||
def is_authenticated():
|
||||
return False
|
||||
|
@ -55,7 +61,9 @@ class BasicAuthBackend(AuthenticationBackend):
|
|||
# exists, due to ForeignKey constraints in the schema upheld
|
||||
# by mysqlclient.
|
||||
user = session.query(User).filter(User.ID == record.UsersID).first()
|
||||
user.nonce = util.make_nonce()
|
||||
user.authenticated = True
|
||||
|
||||
return AuthCredentials(["authenticated"]), user
|
||||
|
||||
|
||||
|
|
|
@ -37,6 +37,7 @@ class User(Base):
|
|||
|
||||
# High-level variables used to track authentication (not in DB).
|
||||
authenticated = False
|
||||
nonce = None
|
||||
|
||||
def __init__(self, Passwd: str = str(), **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
import base64
|
||||
import math
|
||||
import random
|
||||
import re
|
||||
import secrets
|
||||
import string
|
||||
|
||||
from collections import OrderedDict
|
||||
|
@ -20,6 +22,15 @@ def make_random_string(length):
|
|||
string.digits, k=length))
|
||||
|
||||
|
||||
def make_nonce(length: int = 8):
|
||||
""" Generate a single random nonce. Here, token_hex generates a hex
|
||||
string of 2 hex characters per byte, where the length give is
|
||||
nbytes. This means that to get our proper string length, we need to
|
||||
cut it in half and truncate off any remaining (in the case that
|
||||
length was uneven). """
|
||||
return secrets.token_hex(math.ceil(length / 2))[:length]
|
||||
|
||||
|
||||
def valid_username(username):
|
||||
min_len = aurweb.config.getint("options", "username_min_len")
|
||||
max_len = aurweb.config.getint("options", "username_max_len")
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
<script type="text/javascript" src="https://ajax.googleapis.com/ajax/libs/jquery/1.8.2/jquery.min.js"></script>
|
||||
<script type="text/javascript" src="/static/js/bootstrap-typeahead.min.js"></script>
|
||||
<script type="text/javascript">
|
||||
<script type="text/javascript" nonce="{{ request.user.nonce }}">
|
||||
$(document).ready(function() {
|
||||
$('#pkgsearch-field').typeahead({
|
||||
source: function(query, callback) {
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
import re
|
||||
import urllib.parse
|
||||
|
||||
from http import HTTPStatus
|
||||
|
||||
import lxml.etree
|
||||
import pytest
|
||||
|
||||
from fastapi.testclient import TestClient
|
||||
|
@ -39,6 +41,24 @@ def test_index():
|
|||
assert response.status_code == int(HTTPStatus.OK)
|
||||
|
||||
|
||||
def test_index_security_headers():
|
||||
""" Check for the existence of CSP, XCTO, XFO and RP security headers.
|
||||
|
||||
CSP: Content-Security-Policy
|
||||
XCTO: X-Content-Type-Options
|
||||
RP: Referrer-Policy
|
||||
XFO: X-Frame-Options
|
||||
"""
|
||||
# Use `with` to trigger FastAPI app events.
|
||||
with client as req:
|
||||
response = req.get("/")
|
||||
assert response.status_code == int(HTTPStatus.OK)
|
||||
assert response.headers.get("Content-Security-Policy") is not None
|
||||
assert response.headers.get("X-Content-Type-Options") == "nosniff"
|
||||
assert response.headers.get("Referrer-Policy") == "same-origin"
|
||||
assert response.headers.get("X-Frame-Options") == "SAMEORIGIN"
|
||||
|
||||
|
||||
def test_favicon():
|
||||
""" Test the favicon route at '/favicon.ico'. """
|
||||
response1 = client.get("/static/images/favicon.ico")
|
||||
|
@ -106,3 +126,25 @@ def test_error_messages():
|
|||
response2 = client.get("/raisefivethree")
|
||||
assert response1.status_code == int(HTTPStatus.NOT_FOUND)
|
||||
assert response2.status_code == int(HTTPStatus.SERVICE_UNAVAILABLE)
|
||||
|
||||
|
||||
def test_nonce_csp():
|
||||
with client as request:
|
||||
response = request.get("/")
|
||||
data = response.headers.get("Content-Security-Policy")
|
||||
nonce = next(field for field in data.split("; ") if "nonce" in field)
|
||||
match = re.match(r"^script-src .*'nonce-([a-fA-F0-9]{8})' .*$", nonce)
|
||||
nonce = match.group(1)
|
||||
assert nonce is not None and len(nonce) == 8
|
||||
|
||||
parser = lxml.etree.HTMLParser(recover=True)
|
||||
root = lxml.etree.fromstring(response.text, parser=parser)
|
||||
|
||||
nonce_verified = False
|
||||
scripts = root.xpath("//script")
|
||||
for script in scripts:
|
||||
if script.text is not None:
|
||||
assert "nonce" in script.keys()
|
||||
if not (nonce_verified := (script.get("nonce") == nonce)):
|
||||
break
|
||||
assert nonce_verified is True
|
||||
|
|
Loading…
Add table
Reference in a new issue