AppSec Project - Chapter 2, Manually fixing more vulnerabilities
Introduction
Hello, and welcome to chapter 2 of our AppSec project! If you're not up to speed with what's going on, be sure to start with chapter 1.
Today, we'll continue our journey of manually fixing vulnerabilities. Without further delay, let's start.
Weak Password Reset Token Generation
On the login page, there's a "Forgot my password" option.
Let’s investigate the forgot-password route.
@auth.route('/forgot-password', methods=['POST'])
def forgot_password():
email = request.form.get('forgot-email')
user = User.query.filter_by(email=email).first()
# Checking if user exists
if user:
token = str(user.id)
random.seed(int(time.time()) * 1000)
salt = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=10))
token = token + hashlib.sha256((token + salt).encode()).hexdigest() # Generate a secure token
new_token = Token(user_id=user.id, token=token)
db.session.add(new_token)
db.session.commit()
flash('A recovery email has been sent to your address!', 'success')
else:
flash('Email address not found!', 'error')
return redirect(url_for('auth.login')) # Redirect to the login page after submission
What we observe is that when a user exists, a token is generated using the user’s ID and a salt derived from a random number. Now, let’s try sending a recovery email.
The application doesn’t actually send an email since it’s in the development stage, but it does generate a token and stores it in the database.
To reset their password, the user must use the reset-password route by accessing /reset-password/<token>
.
@auth.route('/reset-password/<token>', methods=['GET', 'POST'])
def reset_password(token):
token_entry = Token.query.filter_by(token=token).first()
if request.method == 'POST':
new_password = request.form.get('new_password')
if token_entry:
user = token_entry.user
user.password = new_password
db.session.commit()
flash('Your password has been updated!', 'success')
return redirect(url_for('auth.login'))
else:
flash('Invalid or expired token.', 'error')
return render_template('reset_password.html', token=token, user=current_user)
The issue here is that the user’s ID is part of the token, and the ID is easily accessible through the profile route. Another problem lies in the random seed used to generate the token, which is based on the server’s current timestamp, making it predictable.
token = str(user.id)
random.seed(int(time.time()) * 1000)
salt = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=10))
token = token + hashlib.sha256((token + salt).encode()).hexdigest()
Since the seed is predictable, so is the salt, meaning we can predict the entire token.
An attacker could exploit this by writing a script to generate potential tokens around the timestamp, account for some margin of error, and write these possibilities to a file called tokens.txt
, then brute-force the reset-password route to find the correct token.
import random, hashlib, time, requests
url = "http://127.0.0.1:5000/forgot-password"
data = {"forgot-email":"bido@gmail.com"}
initial_timestamp = int(time.time()) * 1000
requests.post(url, verify=False, data=data)
final_timestamp = initial_timestamp + 1000
for current_timestamp in range(initial_timestamp, final_timestamp):
token = "2"
timestamp = current_timestamp
seed = random.seed(current_timestamp)
salt = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=10))
token = token + hashlib.sha256((token + salt).encode()).hexdigest()
f = open("tokens.txt", "a")
f.write(token + "\n")
f.close()
Here, we see how the last token was found in tokens.txt
.
To fix this vulnerability, we should use a more secure method for generating the token. Instead of relying on the insecure random module and a timestamp-based seed, we can switch to the secrets
module, which is designed for secure cryptographic functions.
import secrets, string
...
if user:
token = str(user.id)
salt = ''.join(secrets.choice(string.ascii_lowercase + string.digits) for _ in range(10))
token = token + hashlib.sha256((token + salt).encode()).hexdigest() # Generate a secure token
new_token = Token(user_id=user.id, token=token)
db.session.add(new_token)
db.session.commit()
flash('A recovery email has been sent to your address!', 'success')
else:
flash('Email address not found!', 'error')
IDOR
Let’s take a look at the note routes.
At first glance, this appears to be a simple note-taking feature where users can add and remove notes. The code that handles viewing and adding notes seems secure, as it retrieves the user by their session.
@views.route('/notes', methods=['GET', 'POST'])
@login_required
def notes():
if request.method == 'POST':
note = request.form.get('note')
if len(note) < 1:
flash('Note is too short!', category='error')
else:
new_note = Note(data=note, user_id=current_user.id)
db.session.add(new_note)
db.session.commit()
flash('Note added!', category='success')
return render_template("notes.html", user=current_user)
However, the delete-note route stands out as problematic. The noteId
is passed as a request parameter, but there’s no authorization check to ensure that the user deleting the note actually owns it. This creates an Insecure Direct Object Reference (IDOR) vulnerability!
@views.route('/delete-note', methods=['POST'])
@login_required
def delete_node():
note = json.loads(request.data)
noteId = note['noteId']
note = Note.query.get(noteId)
if note:
db.session.delete(note)
db.session.commit()
return jsonify({})
Using Burp Suite, we can easily manipulate the request to delete notes that belong to other users. For instance, as we observe in the database, note 3 belongs to the administrator with user_id 1.
We can successfully delete the admin’s note.
To fix this, we need to validate that the note being deleted belongs to the user making the request.
@views.route('/delete-note', methods=['POST'])
@login_required
def delete_node():
note = json.loads(request.data)
noteId = note['noteId']
note = Note.query.get(noteId)
if note:
# Checking authorization
if note.user_id == current_user.id:
db.session.delete(note)
db.session.commit()
return jsonify({"message": "Note successfully deleted"}), 200
return jsonify({"message": "You cannot delete this note"}), 403
Now, when attempting to delete another user's note, the server correctly responds with a 403 Forbidden error, preventing unauthorized deletion.
Privilege Escalation
Looking at the code, we notice that all routes accessible to normal users have been covered so far.
However, there are additional routes under /admin/
, where permission checks rely on current_user.role != 'administrator'
.
In models.py
, the database model shows that the default role assigned during registration is 'user', so we need a method to escalate our privileges to access these routes.
class User(db.Model, UserMixin):
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(150), unique=True)
password = db.Column(db.String(150))
username = db.Column(db.String(150))
description = db.Column(db.String(500), nullable=True)
image = db.Column(db.String(150), default='default-icon.png')
role = db.Column(db.String(150), default='user')
notes = db.relationship('Note')
The /admin/dashboard
route has a POST method intended to update user roles based on JSON input containing a username and a role. However, the role change validation is incomplete because the role check is being performed after the request is processed.
@views.route('/admin/dashboard', methods=['GET', 'POST'])
@login_required
def admin_dashboard():
if request.method == 'POST':
data_str = request.data
data = json.loads(data_str)
username = data.get('user')
new_role = data.get('role')
# Find the user by username
user = User.query.filter_by(username=username).first()
if not user:
return jsonify({"message": "User not found"}), 404
if new_role.lower() == 'administrator':
return jsonify({"message": "You cannot update the role to 'administrator'"}), 404
else:
user.role = new_role
db.session.commit()
return jsonify({"message": f"User {username} updated to role {new_role}"}), 200
if current_user.role != 'administrator':
return "Access Denied", 403
query = request.args.get('query', '')
notes = []
if query:
sql = text(f"SELECT * FROM note WHERE data LIKE '%{query}%'")
notes = db.session.execute(sql).fetchall()
rendered_notes = [render_template_string(note.data) for note in notes]
return render_template("admin.html", user=current_user, query=query, notes=rendered_notes)
The code includes a check to block escalation to the administrator role, as seen in the snippet below.
if new_role.lower() == 'administrator':
return jsonify({"message": "You cannot update the role to 'administrator'"}), 404
Interestingly, the developers implemented an unnecessary unicode sanitization on the role value, allowing us to exploit this oversight.
user.role = new_role.encode('utf-8', 'ignore').decode('utf-8')
By appending a Unicode character, like \ud888
, to the role value administrator, we can bypass the check, since administrator\ud888
is different from administrator
. The decoding removes \ud888
, leaving us with the elevated administrator role.
Payload:
{"user":"b1d0ws","role":"administrator\ud888"}
Now we have access to the administrator dashboard.
To fix this, remove the unnecessary unicode decoding to prevent unintended bypasses. This example illustrates how JSON injection can enable privilege escalation in APIs, inspired by Dana Epp’s article.
This can happen when an application has multiple ways to parse data, creating inconsistencies that allows this type of exploitation.
After applying the fix, attempting to set the role to administrator now triggers a 500 error instead of updating the user role.
SQL Injection
In the admin panel, users have a search function to look up notes.
By entering a single quote ('
) in the search box, we trigger an error that reveals the database query structure due to the application running in debug mode. We'll address this debug setting later as part of our improvements.
For now, let’s take a closer look at the query itself in the source code. In the /admin/dashboard
route, we see the following statement:
query = request.args.get('query', '')
notes = []
if query:
sql = text(f"SELECT * FROM note WHERE data LIKE '%{query}%'")
notes = db.session.execute(sql).fetchall()
rendered_notes = [render_template_string(note.data) for note in notes]
return render_template("admin.html", user=current_user, query=query, notes=rendered_notes)
Here, the search parameter is directly concatenated into the SQL statement, which makes the query vulnerable to SQL injection. With this exposure, an attacker could potentially inject code like the one below to retrieve sensitive information, such as user passwords.
' UNION SELECT 1,password,3,4 FROM user –
To fix this vulnerability, we can use SQLAlchemy’s parameterized queries, which avoid direct concatenation of user inputs and ensure the input is safely processed.
if query:
sql = text("SELECT * FROM note WHERE data LIKE :query")
notes = db.session.execute(sql, {'query': f'%{query}%'}).fetchall()
After implementing this fix, any attempt to inject SQL through the query parameter will be interpreted only as text, preserving database security and query integrity.
SSTI
In the code analyzed just now, we noticed a curious function. The application uses render_template_string
, which means it directly interprets the content of note.data
as a template.
Since we control the content in the notes tab, we can inject code to execute commands.
rendered_notes = [render_template_string(note.data) for note in notes]
return render_template("admin.html", user=current_user, query=query, notes=rendered_notes)
For example, using the following payload, we can save it as a note and then trigger it through a search on the admin panel.
This executes the whoami
command, which returns pc\eduar
.
Additionally, we can achieve a reverse shell with this payload:
nc64.exe
in the application’s folder to establish the reverse shell. This setup may differ if testing in a restricted or Linux environment.If render_template_string
were necessary, we could escape the note content to mitigate this risk. However, because the functionality does not require template rendering, we can simply switch to render_template
to safely display notes.
return render_template("admin.html", user=current_user, query=query, notes=notes)
To complete the fix, update admin.html
as follows, changing it from:
<ul class="list-group">
{% for note in notes %}
<li class="list-group-item">{{ note }}</li>
{% endfor %}
</ul>
To:
<ul class="list-group">
{% for note in notes %}
<li class="list-group-item">{{ note.data }}</li>
{% endfor %}
</ul>
Now, the content of each note displays as text, preventing any code execution.
SSRF
In this section, we will examine the "Fetch Resource from URL" functionality to identify potential vulnerabilities.
When we enter https://google.com
, the website is successfully displayed, indicating that the functionality is working.
However, upon reviewing the code, we notice that the url parameter is directly passed to the requests library to retrieve content.
@views.route('/admin/fetch-url', methods=['POST'])
@login_required
def fetch_url():
if current_user.role != 'administrator':
return "Access Denied", 403
url = request.form.get('url')
try:
response = requests.get(url)
return response.content
except Exception as e:
return jsonify({"error": str(e)}), 400
Since there is no input sanitization, an attacker could exploit this vulnerability to perform Server-Side Request Forgery (SSRF) by targeting internal addresses.
To demonstrate this, I started a web server hosting a secret page on port 80. The following payload successfully reveals the internal content:
To address this vulnerability, we need to sanitize user input to prevent access to any internal addresses.
The first step is to check the protocol and ensure it is either HTTP or HTTPS. An attacker might exploit unsupported protocols to perform SSRF attacks with potentially severe consequences.
if parsed_url.scheme not in ["http", "https"]:
return jsonify({"error": "Invalid URL scheme"}), 400
Ideally, the best practice for preventing SSRF attacks involves implementing an allow list of trusted domains. For example:
TRUSTED_DOMAINS = ["example.com", "another-trusted-domain.com"]
hostname = parsed_url.hostname
if hostname not in TRUSTED_DOMAINS:
return jsonify({"error": "Access to this host is restricted"}), 403
Instead, we can define forbidden IP ranges using ip_network
from the ipaddress
module. This will allow us to block access to certain internal routes, including localhost and private IP addresses:
FORBIDDEN_IP_RANGES = [
ip_network("127.0.0.0/8"), # Loopback
ip_network("10.0.0.0/8"), # Private
ip_network("172.16.0.0/12"), # Private
ip_network("192.168.0.0/16"), # Private
ip_network("169.254.0.0/16"), # Link-local
ip_network("::1/128"), # IPv6 loopback
ip_network("fc00::/7"), # IPv6 unique local
ip_network("fe80::/10") # IPv6 link-local
# Validate host and avoid private IPs
hostname = parsed_url.hostname
if re.match(r"^(localhost|127\.|0\.0\.0\.0|::1|10\.|192\.168\.|172\.(1[6-9]|2[0-9]|3[01]))", hostname):
return jsonify({"error": "Access to this host is restricted"}), 403
Additionally, we can perform DNS resolution to catch common bypass attempts, such as encoded URLs. By comparing the resolved URL against the forbidden IPs, we can further enhance security.
try:
# Resolve hostname to an IP and validate it
resolved_ip = ip_address(socket.gethostbyname(hostname))
# Check if the resolved IP falls within forbidden ranges
if any(resolved_ip in net for net in FORBIDDEN_IP_RANGES):
return jsonify({"error": "Access to this IP is restricted"}), 403
Conclusion
In today's article, we fixed six vulnerabilities, making our application significantly more secure. In the next chapter, we will focus on enhancing the overall security of the app, though we won't be specifically addressing vulnerabilities.
For these fixes, we’re still on branch fixing and the commit reference is 1aeff150dc58f7d4d3c060b665b7902db6f01de5
.
See ya =)
Subscribe to my newsletter
Read articles from b1d0ws directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by