AppSec Project - Chapter 2, Manually fixing more vulnerabilities

b1d0wsb1d0ws
10 min read

Introduction

Hello, and welcome to chapter 2 of our AppSec project! If you're not up to speed with what's going on, be sure to start with chapter 1.

Today, we'll continue our journey of manually fixing vulnerabilities. Without further delay, let's start.


Weak Password Reset Token Generation

On the login page, there's a "Forgot my password" option.

Let’s investigate the forgot-password route.

@auth.route('/forgot-password', methods=['POST'])
def forgot_password():
    email = request.form.get('forgot-email')

    user = User.query.filter_by(email=email).first()

    # Checking if user exists
    if user:
        token = str(user.id)
        random.seed(int(time.time()) * 1000)
        salt = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=10))
        token = token + hashlib.sha256((token + salt).encode()).hexdigest()  # Generate a secure token
        new_token = Token(user_id=user.id, token=token)
        db.session.add(new_token)
        db.session.commit()
        flash('A recovery email has been sent to your address!', 'success')
    else:
        flash('Email address not found!', 'error')

    return redirect(url_for('auth.login'))  # Redirect to the login page after submission

What we observe is that when a user exists, a token is generated using the user’s ID and a salt derived from a random number. Now, let’s try sending a recovery email.

The application doesn’t actually send an email since it’s in the development stage, but it does generate a token and stores it in the database.

To reset their password, the user must use the reset-password route by accessing /reset-password/<token>.

@auth.route('/reset-password/<token>', methods=['GET', 'POST'])
def reset_password(token):
    token_entry = Token.query.filter_by(token=token).first()

    if request.method == 'POST':
        new_password = request.form.get('new_password')
        if token_entry:
            user = token_entry.user
            user.password = new_password
            db.session.commit()
            flash('Your password has been updated!', 'success')
            return redirect(url_for('auth.login'))
        else:
            flash('Invalid or expired token.', 'error')

    return render_template('reset_password.html', token=token, user=current_user)

The issue here is that the user’s ID is part of the token, and the ID is easily accessible through the profile route. Another problem lies in the random seed used to generate the token, which is based on the server’s current timestamp, making it predictable.

token = str(user.id)
random.seed(int(time.time()) * 1000)
salt = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=10))
token = token + hashlib.sha256((token + salt).encode()).hexdigest()

Since the seed is predictable, so is the salt, meaning we can predict the entire token.

An attacker could exploit this by writing a script to generate potential tokens around the timestamp, account for some margin of error, and write these possibilities to a file called tokens.txt, then brute-force the reset-password route to find the correct token.

import random, hashlib, time, requests

url = "http://127.0.0.1:5000/forgot-password"
data = {"forgot-email":"bido@gmail.com"}

initial_timestamp = int(time.time()) * 1000
requests.post(url, verify=False, data=data)
final_timestamp = initial_timestamp + 1000

for current_timestamp in range(initial_timestamp, final_timestamp):
    token = "2"
    timestamp = current_timestamp
    seed = random.seed(current_timestamp)
    salt = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=10))
    token = token + hashlib.sha256((token + salt).encode()).hexdigest()
    f = open("tokens.txt", "a")
    f.write(token + "\n")
    f.close()

Here, we see how the last token was found in tokens.txt.

To fix this vulnerability, we should use a more secure method for generating the token. Instead of relying on the insecure random module and a timestamp-based seed, we can switch to the secrets module, which is designed for secure cryptographic functions.

import secrets, string
...
if user:
        token = str(user.id)
        salt = ''.join(secrets.choice(string.ascii_lowercase + string.digits) for _ in range(10))
        token = token + hashlib.sha256((token + salt).encode()).hexdigest()  # Generate a secure token
        new_token = Token(user_id=user.id, token=token)
        db.session.add(new_token)
        db.session.commit()
        flash('A recovery email has been sent to your address!', 'success')
    else:
        flash('Email address not found!', 'error')

IDOR

Let’s take a look at the note routes.

At first glance, this appears to be a simple note-taking feature where users can add and remove notes. The code that handles viewing and adding notes seems secure, as it retrieves the user by their session.

@views.route('/notes', methods=['GET', 'POST'])
@login_required
def notes():
    if request.method == 'POST':
        note = request.form.get('note')

        if len(note) < 1:
            flash('Note is too short!', category='error')
        else:
            new_note = Note(data=note, user_id=current_user.id)
            db.session.add(new_note)
            db.session.commit()
            flash('Note added!', category='success')

    return render_template("notes.html", user=current_user)

However, the delete-note route stands out as problematic. The noteId is passed as a request parameter, but there’s no authorization check to ensure that the user deleting the note actually owns it. This creates an Insecure Direct Object Reference (IDOR) vulnerability!

@views.route('/delete-note', methods=['POST'])
@login_required
def delete_node():
    note = json.loads(request.data)
    noteId = note['noteId']
    note = Note.query.get(noteId)
    if note: 
        db.session.delete(note)
        db.session.commit()
    return jsonify({})

Using Burp Suite, we can easily manipulate the request to delete notes that belong to other users. For instance, as we observe in the database, note 3 belongs to the administrator with user_id 1.

We can successfully delete the admin’s note.

To fix this, we need to validate that the note being deleted belongs to the user making the request.

@views.route('/delete-note', methods=['POST'])
@login_required
def delete_node():
    note = json.loads(request.data)
    noteId = note['noteId']
    note = Note.query.get(noteId)

    if note:
        # Checking authorization
        if note.user_id == current_user.id:
            db.session.delete(note)
            db.session.commit()
            return jsonify({"message": "Note successfully deleted"}), 200
    return jsonify({"message": "You cannot delete this note"}), 403

Now, when attempting to delete another user's note, the server correctly responds with a 403 Forbidden error, preventing unauthorized deletion.


Privilege Escalation

Looking at the code, we notice that all routes accessible to normal users have been covered so far.

However, there are additional routes under /admin/, where permission checks rely on current_user.role != 'administrator'.

In models.py, the database model shows that the default role assigned during registration is 'user', so we need a method to escalate our privileges to access these routes.

class User(db.Model, UserMixin):
    id = db.Column(db.Integer, primary_key=True)
    email = db.Column(db.String(150), unique=True)
    password = db.Column(db.String(150))
    username = db.Column(db.String(150))
    description = db.Column(db.String(500), nullable=True)
    image = db.Column(db.String(150), default='default-icon.png')
    role = db.Column(db.String(150), default='user')
    notes = db.relationship('Note')

The /admin/dashboard route has a POST method intended to update user roles based on JSON input containing a username and a role. However, the role change validation is incomplete because the role check is being performed after the request is processed.

@views.route('/admin/dashboard', methods=['GET', 'POST'])
@login_required
def admin_dashboard():

    if request.method == 'POST':

        data_str = request.data
        data = json.loads(data_str)

        username = data.get('user')
        new_role = data.get('role')

        # Find the user by username
        user = User.query.filter_by(username=username).first()

        if not user:
            return jsonify({"message": "User not found"}), 404

        if new_role.lower() == 'administrator':
            return jsonify({"message": "You cannot update the role to 'administrator'"}), 404
        else:
            user.role = new_role

        db.session.commit()

        return jsonify({"message": f"User {username} updated to role {new_role}"}), 200

    if current_user.role != 'administrator':
        return "Access Denied", 403

    query = request.args.get('query', '')
    notes = []

    if query:
        sql = text(f"SELECT * FROM note WHERE data LIKE '%{query}%'")
        notes = db.session.execute(sql).fetchall()

    rendered_notes = [render_template_string(note.data) for note in notes]

    return render_template("admin.html", user=current_user, query=query, notes=rendered_notes)

The code includes a check to block escalation to the administrator role, as seen in the snippet below.

if new_role.lower() == 'administrator':
            return jsonify({"message": "You cannot update the role to 'administrator'"}), 404

Interestingly, the developers implemented an unnecessary unicode sanitization on the role value, allowing us to exploit this oversight.

user.role = new_role.encode('utf-8', 'ignore').decode('utf-8')

By appending a Unicode character, like \ud888, to the role value administrator, we can bypass the check, since administrator\ud888 is different from administrator. The decoding removes \ud888, leaving us with the elevated administrator role.

Payload:

{"user":"b1d0ws","role":"administrator\ud888"}

Now we have access to the administrator dashboard.

To fix this, remove the unnecessary unicode decoding to prevent unintended bypasses. This example illustrates how JSON injection can enable privilege escalation in APIs, inspired by Dana Epp’s article.

This can happen when an application has multiple ways to parse data, creating inconsistencies that allows this type of exploitation.

After applying the fix, attempting to set the role to administrator now triggers a 500 error instead of updating the user role.


SQL Injection

In the admin panel, users have a search function to look up notes.

By entering a single quote (') in the search box, we trigger an error that reveals the database query structure due to the application running in debug mode. We'll address this debug setting later as part of our improvements.

For now, let’s take a closer look at the query itself in the source code. In the /admin/dashboard route, we see the following statement:

    query = request.args.get('query', '')
    notes = []

    if query:
        sql = text(f"SELECT * FROM note WHERE data LIKE '%{query}%'")
        notes = db.session.execute(sql).fetchall()

    rendered_notes = [render_template_string(note.data) for note in notes]

    return render_template("admin.html", user=current_user, query=query, notes=rendered_notes)

Here, the search parameter is directly concatenated into the SQL statement, which makes the query vulnerable to SQL injection. With this exposure, an attacker could potentially inject code like the one below to retrieve sensitive information, such as user passwords.

' UNION SELECT 1,password,3,4 FROM user

To fix this vulnerability, we can use SQLAlchemy’s parameterized queries, which avoid direct concatenation of user inputs and ensure the input is safely processed.

if query:
    sql = text("SELECT * FROM note WHERE data LIKE :query")
    notes = db.session.execute(sql, {'query': f'%{query}%'}).fetchall()

After implementing this fix, any attempt to inject SQL through the query parameter will be interpreted only as text, preserving database security and query integrity.


SSTI

In the code analyzed just now, we noticed a curious function. The application uses render_template_string, which means it directly interprets the content of note.data as a template.

Since we control the content in the notes tab, we can inject code to execute commands.

rendered_notes = [render_template_string(note.data) for note in notes]

return render_template("admin.html", user=current_user, query=query, notes=rendered_notes)

For example, using the following payload, we can save it as a note and then trigger it through a search on the admin panel.

This executes the whoami command, which returns pc\eduar.

Additionally, we can achieve a reverse shell with this payload:

💡
Since this is a white-box test, we’ve placed nc64.exe in the application’s folder to establish the reverse shell. This setup may differ if testing in a restricted or Linux environment.

If render_template_string were necessary, we could escape the note content to mitigate this risk. However, because the functionality does not require template rendering, we can simply switch to render_template to safely display notes.

return render_template("admin.html", user=current_user, query=query, notes=notes)

To complete the fix, update admin.html as follows, changing it from:

<ul class="list-group">
    {% for note in notes %}
    <li class="list-group-item">{{ note }}</li>
    {% endfor %}
</ul>

To:

<ul class="list-group">
    {% for note in notes %}
    <li class="list-group-item">{{ note.data }}</li>
    {% endfor %}
</ul>

Now, the content of each note displays as text, preventing any code execution.


SSRF

In this section, we will examine the "Fetch Resource from URL" functionality to identify potential vulnerabilities.

When we enter https://google.com, the website is successfully displayed, indicating that the functionality is working.

However, upon reviewing the code, we notice that the url parameter is directly passed to the requests library to retrieve content.

@views.route('/admin/fetch-url', methods=['POST'])
@login_required
def fetch_url():
    if current_user.role != 'administrator':
        return "Access Denied", 403

    url = request.form.get('url')

    try:
        response = requests.get(url)
        return response.content
    except Exception as e:
        return jsonify({"error": str(e)}), 400

Since there is no input sanitization, an attacker could exploit this vulnerability to perform Server-Side Request Forgery (SSRF) by targeting internal addresses.

To demonstrate this, I started a web server hosting a secret page on port 80. The following payload successfully reveals the internal content:

To address this vulnerability, we need to sanitize user input to prevent access to any internal addresses.

The first step is to check the protocol and ensure it is either HTTP or HTTPS. An attacker might exploit unsupported protocols to perform SSRF attacks with potentially severe consequences.

if parsed_url.scheme not in ["http", "https"]:
    return jsonify({"error": "Invalid URL scheme"}), 400

Ideally, the best practice for preventing SSRF attacks involves implementing an allow list of trusted domains. For example:

TRUSTED_DOMAINS = ["example.com", "another-trusted-domain.com"]

hostname = parsed_url.hostname
if hostname not in TRUSTED_DOMAINS:
    return jsonify({"error": "Access to this host is restricted"}), 403

Instead, we can define forbidden IP ranges using ip_network from the ipaddress module. This will allow us to block access to certain internal routes, including localhost and private IP addresses:

FORBIDDEN_IP_RANGES = [
    ip_network("127.0.0.0/8"),        # Loopback
    ip_network("10.0.0.0/8"),         # Private
    ip_network("172.16.0.0/12"),      # Private
    ip_network("192.168.0.0/16"),     # Private
    ip_network("169.254.0.0/16"),     # Link-local
    ip_network("::1/128"),            # IPv6 loopback
    ip_network("fc00::/7"),           # IPv6 unique local
    ip_network("fe80::/10")           # IPv6 link-local
# Validate host and avoid private IPs
hostname = parsed_url.hostname
if re.match(r"^(localhost|127\.|0\.0\.0\.0|::1|10\.|192\.168\.|172\.(1[6-9]|2[0-9]|3[01]))", hostname):
    return jsonify({"error": "Access to this host is restricted"}), 403

Additionally, we can perform DNS resolution to catch common bypass attempts, such as encoded URLs. By comparing the resolved URL against the forbidden IPs, we can further enhance security.

try:
    # Resolve hostname to an IP and validate it
    resolved_ip = ip_address(socket.gethostbyname(hostname))

    # Check if the resolved IP falls within forbidden ranges
    if any(resolved_ip in net for net in FORBIDDEN_IP_RANGES):
        return jsonify({"error": "Access to this IP is restricted"}), 403

Conclusion

In today's article, we fixed six vulnerabilities, making our application significantly more secure. In the next chapter, we will focus on enhancing the overall security of the app, though we won't be specifically addressing vulnerabilities.

For these fixes, we’re still on branch fixing and the commit reference is 1aeff150dc58f7d4d3c060b665b7902db6f01de5.

See ya =)

0
Subscribe to my newsletter

Read articles from b1d0ws directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

b1d0ws
b1d0ws