ML Pipelines

VājradēvaṁVājradēvaṁ
18 min read

Machine Learning is often taught in a beta way. By that i mean that bois and grills do not know what they are doing. They import shit, hit the run button in their jupyter notebook and get working models. they do not have any idea how the dataset even gets created or how the model is deployed. THis is my blog and my rant; you can cry over it or learn loosers. Learn and become a sigma chad patrick bateman anakin skywalker. or be in your well as a frog king and be shitty like other pajeets who do not know shit about what they are doing and still call themselves meshin larners. Doing Mashin larnin saaaar…..

Machine learning models are useful only when they can be deployed and accessed in real-world applications. This guide will walk through training a simple model and deploying it using FastAPI.

Bruv before installing these dependencies you should make a virtual environment for ur python in order to not pollute your everything. If you are on windows, I am sorry you are in the wrong feild. If you are on linux, you already know what to do. THen,

Install Dependencies

pip install fastapi uvicorn scikit-learn joblib numpy pandas

Train a Simple Model

Create a file train.py to generate data, train a model, and save it. Or use this code in a jupyter notebook to get a model.

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
import joblib

# Generate a simple dataset
data_size = 100
np.random.seed(73_32_108_111_118_101_32_98_111_111_98_115)
X = np.random.rand(data_size, 1) * 10  
y = 2 * X.squeeze() + 3 + np.random.randn(data_size) * 2 

# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train a simple model
model = LinearRegression()
model.fit(X_train, y_train)

# Save the trained model
joblib.dump(model, 'model.pkl')

This saves the model as model.pkl.

`.pkl` is pickle. pickle like real world where you store mango and lemon stuff to eat later. same same.

Create a FastAPI App

Fast API was created by this guy

Sebastian Ramirez | Keynote Speaker | PyCon Colombia 2021

do you think you’ll ever be as smart as him??? you use what he makes…. you cannot even ask him a question in the interview because he made the stuff which you fucks use lmfao!!!!

Create a file app.py:

from fastapi import FastAPI
from pydantic import BaseModel
import joblib
import numpy as np

# Initialize FastAPI app
app = FastAPI()

# Load trained model
model = joblib.load('model.pkl')

# Define request model
class ModelInput(BaseModel):
    features: list[float]

@app.get('/')
def home():
    return {"message": "FastAPI model is running!"}

@app.post('/predict')
def predict(data: ModelInput):
    features = np.array(data.features).reshape(1, -1)  # Ensure correct shape
    prediction = model.predict(features)
    return {"prediction": prediction.tolist()}

Run the FastAPI Server

Uvicorn is minimal python server implementaion. We could have done this things with node.js and shit but everyone knows that doing javascript is fatherless behaviour.

Start the server using Uvicorn:

uvicorn app:app --host 0.0.0.0 --port 8000 --reload

Test the API

Using Swagger UI

Go to http://127.0.0.1:8000/docs

Using cURL… windows chuds should close their eyes here….

curl -X 'POST' 'http://127.0.0.1:8000/predict' -H 'Content-Type: application/json' -d '{"features": [5.0]}'

Important observation

this way of offering a model as a service “MAAS“ i’ll call it because this is my blog and aman likes to talk in third person because why not? MAAS is model agnostic. any model can be pickled. the input feature can be played with as upon ones wish in the application. so we have a very distinct way of creating a model and serving it which does not depend on the dataset. Noice!

Real Life Implementatioon

So, real life isn’t fun and games. you wont have the dataset. and the ones giving you data themselves wont know at times that what the dataset is about and how any of that is working. I am going to present a problem and its solution which will blow my mind as i am a narcisitic fuck who is awak at 1 am after chugging so much coffeee that i am typing insane shit oh my gawd….

Problem: Designing Antennas

we all take antennas for granted. YOur cell phone, pc, laptop, everything; anything wireless uses an antenna. an antenna translates electric signals into em waves which can travel without any physical link. ie hotspot, bluetooth etc…

Nodemcu ESP32-WROOM-32E Board

see that up down up down line on the esp 32? that is an antenna. specifically a microchip patch antenna… that specific structure is responsible for the wireless fucntion… how, dont ask. so, designing these antenna s difficult, becasuse you dont know the shape you want for a specific fucntion. there are some key propertires of an antenna; s11, gain and frequency. there exist other but we do not care about that.

s11: how much power is the antenna reflecting which is being fed. the lower the better.

gain: how directional is the antenna’s radiation

frequency: what is the frequency at which the antenna is resonating.

the shape, structure and the size of the antenna decides these properties. but simulating each iteration takes long time, let alone fabricating and testing.

but what if we had enough dataset for a specific design that we can interpolate b/w the points and find the perfect antenna?

yes, we do. simulations can be run on software such as CST Studio and Ansys. then the data can be exported and then do mesihn larnin saar

a cartoon of a girl with the words happy happy happy written on it

unfortunately, not so easy brah…. the dataset produced by the software is kind of shit…. you need to turn that data into a proper dataset with feature and output vectors, basically turning the nonsense into proper supervised learning task. THis is the first stage of any pipeline. here is a look of the data

S11

#Parameters = {fl=11; fw=1.5; gt=0.4; inl=2; inw=0.15; k=4.5; k_mul=0.5; pl=13; pt=0.4; pw=23.7; sl=35; st=3; sw=55; x=1}
#"Frequency / THz"    "S1,1 (1) [Magnitude]"
#----------------------------------------
0.10000000000000    -0.21915244880893
0.10490000000000    -0.22501818589599
0.10980000000000    -0.23097363186125
0.11470000000000    -0.23701309451453
0.11960000000000    -0.24313087781245
0.12450000000000    -0.24932128544623
0.12940000000000    -0.25557862453194
0.13430000000000    -0.26189720940188

Gain

#Parameters = {fl=11; fw=1.5; gt=0.4; inl=2; inw=0.15; k=4.5; k_mul=0.5; pl=13; pt=0.4; pw=23.7; sl=35; st=3; sw=55; x=1}
#"Frequency / THz"    "Gain (IEEE),3D,Max. Value (Solid Angle) (1) [Real Part]"
#---------------------------------------------------------------------------
0.10000000000000    -43.803419911702
0.59000000000000    -15.000965803464
1.0800000000000    -8.9299558451649
1.5700000000000    -14.278192668691
2.0600000000000    -9.4267861530604
2.5500000000000    -6.0815920991222
3.0400000000000    -1.6797698165974
3.5300000000000    1.9915585100194

pretty much un-learnable at the moment. what we need to do is to extract each training example from this raw dataset and make a vecotr with the freq, gain, s11 and the parameters which were varied.

doing this manually is human intensive and error prone. it took 3 guys 1 week 12 hours a day to get 1000 training example out of this. a pipeline i built does the same job in 5 seconds. cool isn’t it?

Lets take a deep dive (like chatgpt bsdka says) into the dataset creator lord

from __future__ import division 
import customtkinter
from customtkinter import filedialog

import numpy as np
import csv

customtkinter.set_appearance_mode("dark")
customtkinter.set_default_color_theme("dark-blue")

def extract( 
        s11_filename, 
        gain_filename, 
        save_file,
        bool_get_s11,
        bool_get_gain,
        bool_get_freq,
        bool_get_bandwidth):

    def interpolated_intercepts(x, y1, y2):
        """Find the intercepts of two curves, given by the same x data"""

        def intercept(point1, point2, point3, point4):
            """find the intersection between two lines
            the first line is defined by the line between point1 and point2
            the second line is defined by the line between point3 and point4
            each point is an (x,y) tuple.

            So, for example, you can find the intersection between
            intercept((0,0), (1,1), (0,1), (1,0)) = (0.5, 0.5)

            Returns: the intercept, in (x,y) format
            """    

            def line(p1, p2):
                A = (p1[1] - p2[1])
                B = (p2[0] - p1[0])
                C = (p1[0]*p2[1] - p2[0]*p1[1])
                return A, B, -C

            def intersection(L1, L2):
                D  = L1[0] * L2[1] - L1[1] * L2[0]
                Dx = L1[2] * L2[1] - L1[1] * L2[2]
                Dy = L1[0] * L2[2] - L1[2] * L2[0]

                x = Dx / D
                y = Dy / D
                return x,y

            L1 = line([point1[0],point1[1]], [point2[0],point2[1]])
            L2 = line([point3[0],point3[1]], [point4[0],point4[1]])

            R = intersection(L1, L2)

            return R

        idxs = np.argwhere(np.diff(np.sign(y1 - y2)) != 0)

        xcs = []
        ycs = []

        for idx in idxs:
            xc, yc = intercept((x[idx], y1[idx]),((x[idx+1], y1[idx+1])), ((x[idx], y2[idx])), ((x[idx+1], y2[idx+1])))
            xcs.append(xc)
            ycs.append(yc)
        return np.array(xcs), np.array(ycs)

    with open(s11_filename, "r") as f:
        s11_file = [x for x in f.readlines()]

    with open(gain_filename, "r") as f:
        gain_file = [x for x in f.readlines()]

    temp = []
    # get the offsets
    for entry in s11_file[1:]:
        if "Parameters" not in entry:
            temp.append(entry)
        else:
            break
    s11_offset_multiplier = len(temp) + 1

    temp = []
    # get the offsets
    for entry in gain_file[1:]:
        if "Parameters" not in entry:
            temp.append(entry)
        else:
            break
    gain_offset_multiplier = len(temp) + 1

    temp = s11_file[0]
    headers = [x.split("=")[0] for x in temp[15:-2].split("; ")]

    if bool_get_freq: headers.append("freq")
    if bool_get_s11: headers.append("s11")
    if bool_get_gain: headers.append("gain")
    if bool_get_bandwidth: headers.append("bandwidth")

    with open(save_file, "a", newline='') as f:
        writer = csv.writer(f, delimiter=',')
        writer.writerow(headers)

    def get_one(chunk_s11, chunk_gain):

        freqs1 = [float(x[:-1].split('\t')[0]) for x in chunk_gain[3:]]
        gains = [float(x[:-1].split('\t')[1]) for x in chunk_gain[3:]]

        freqs2 = [float(x[:-1].split('\t')[0]) for x in chunk_s11[3:]]
        s11s = [float(x[:-1].split('\t')[1]) for x in chunk_s11[3:]]

        min_s11 = min(s11s)
        freq = freqs2[s11s.index(min_s11)]
        gain = np.interp(freq, freqs1, gains)

        row = [float(x.split("=")[1]) for x in chunk_s11[0][15:-2].split("; ")]

        if bool_get_freq: row.append(freq)
        if bool_get_s11: row.append(min_s11)
        if bool_get_gain: row.append(gain)

        const = []
        for i in range(len(freqs2)):
            const.append(-10)

        s11_crossers, vals = interpolated_intercepts(np.array(freqs2), np.array(const), np.array(s11s))
        d = [x[0] for x in s11_crossers]

        if len(d) % 2 != 0:
            d = d[:-1]

        if bool_get_bandwidth: 
            for i in range(0, len(d), 2):
                if d[i] < freq < d[i + 1]:
                    """importane"""
                    bandwidth = d[i + 1] - d[i]
                    break
            try:
                row.append(bandwidth)
            except UnboundLocalError:
                print(freq, d)

        with open(save_file, "a", newline='') as f:
            writer = csv.writer(f, delimiter=',')
            writer.writerow(row)

    iters = (len(s11_file) // s11_offset_multiplier)

    for i in range(iters):
        current_s11_data = s11_file[-1 * s11_offset_multiplier:]
        current_gain_data = gain_file[-1 * (gain_offset_multiplier):]

        get_one(current_s11_data, current_gain_data)

        s11_file = s11_file[:-1 * s11_offset_multiplier]
        gain_file = gain_file[:-1 * gain_offset_multiplier]

    return 1

class ChekcBoxFrame(customtkinter.CTkFrame):
    def __init__(self, master):
        super().__init__(master)

        # Parameters Checkbox
        self.label_params = customtkinter.CTkLabel(self, text="Parameters")
        self.label_params.grid(row=0, column=0, padx=10, pady=(10, 0), sticky="w")

        # Checkboxes
        self.checkbox_s11 = customtkinter.CTkCheckBox(self, text="S11")
        self.checkbox_s11.grid(row=1, column=0, padx=10, pady=(10, 0), sticky="w")

        self.checkbox_gain = customtkinter.CTkCheckBox(self, text="GAIN")
        self.checkbox_gain.grid(row=1, column=1, padx=10, pady=(10, 0), sticky="w")

        self.checkbox_freq = customtkinter.CTkCheckBox(self, text="FREQ")
        self.checkbox_freq.grid(row=2, column=0, padx=10, pady=(10, 0), sticky="w")

        self.checkbox_bandwidth = customtkinter.CTkCheckBox(self, text="BANDWIDTH")
        self.checkbox_bandwidth.grid(row=2, column=1, padx=10, pady=(10, 0), sticky="w")

    def return_checkboxes(self):
        return (self.checkbox_s11, self.checkbox_gain, self.checkbox_freq, self.checkbox_bandwidth)

class ButtonFrame(customtkinter.CTkFrame):
    def __init__(self, master):
        super().__init__(master)

        self.s11_button = customtkinter.CTkButton(self, text="S11 File", command=self.select_s11_file)
        self.s11_button.grid(row=0, column=0, padx=10, pady=(10, 0), sticky="w")

        self.gain_button = customtkinter.CTkButton(self, text="Gain File", command=self.select_gain_file)
        self.gain_button.grid(row=1, column=0, padx=10, pady=(10, 0), sticky="w")

        self.savefile_button = customtkinter.CTkButton(self, text="Result File", command=self.select_result_file)
        self.savefile_button.grid(row=2, column=0, padx=10, pady=(10, 0), sticky="w")

        self.s11_file_name = ""
        self.gain_file_name = ""
        self.result_file_name = ""

        # File Text Boxes
        self.text_box_s11 = customtkinter.CTkTextbox(self, height=1, width=200)
        self.text_box_s11.grid(row=3, column=1, padx=10, pady=(10, 0), sticky="w")

        self.text_box_gain = customtkinter.CTkTextbox(self, height=1, width=200)
        self.text_box_gain.grid(row=4, column=1, padx=10, pady=(10, 0), sticky="w")

        self.text_box_result = customtkinter.CTkTextbox(self, height=1, width=200)
        self.text_box_result.grid(row=5, column=1, padx=10, pady=(10, 0), sticky="w")

        # File Labels
        self.label_s11_file = customtkinter.CTkLabel(self, text="S11 File: ")
        self.label_s11_file.grid(row=3, column=0, padx=10, pady=(10, 0), sticky="w")

        self.label_gain_file = customtkinter.CTkLabel(self, text="Gain File: ")
        self.label_gain_file.grid(row=4, column=0, padx=10, pady=(10, 0), sticky="w")

        self.label_save_file = customtkinter.CTkLabel(self, text="Save File: ")
        self.label_save_file.grid(row=5, column=0, padx=10, pady=(10, 0), sticky="w")

    def select_s11_file(self):
        self.s11_file_name = filedialog.askopenfilename()
        self.text_box_s11.insert("0.0", text = self.s11_file_name)

    def select_gain_file(self):
        self.gain_file_name = filedialog.askopenfilename()
        self.text_box_gain.insert("0.0", text = self.gain_file_name)

    def select_result_file(self):
        self.result_file_name = filedialog.askopenfilename()
        self.text_box_result.insert("0.0", text = self.result_file_name)

    def return_file_names(self):
        return (self.s11_file_name, self.gain_file_name, self.result_file_name)

class App(customtkinter.CTk):
    def __init__(self):
        super().__init__()

        self.title("CST Studio 2022 Data Pipeline 0.1.2")
        self.geometry("700x500")
        self.grid_columnconfigure(0, weight=1)
        self.grid_rowconfigure((0, 0), weight=1)

        self.checkbox_frame = ChekcBoxFrame(self)
        self.checkbox_frame.grid(row=0, column=0, padx=10, pady=(10, 10), sticky="news")

        self.button_frame = ButtonFrame(self)
        self.button_frame.grid(row=0, column=1, padx=10, pady=(10, 10), sticky="news")

        self.button_compute = customtkinter.CTkButton(self, text="Calculate", command=self.perform_task)
        self.button_compute.grid(row=1, column=0, padx=10, pady=(10, 10), sticky="news")

        self.label_desc = customtkinter.CTkLabel(self, text="©Vajradevam S., Neptunes Aerospace, 2024")
        self.label_desc.grid(row=1, column=1, padx=10, pady=(10, 10), sticky="we")

    def perform_task(self):

        checkboxes = self.checkbox_frame.return_checkboxes()
        bool_get_s11 = checkboxes[0].get()
        bool_get_gain = checkboxes[1].get()
        bool_get_freq = checkboxes[2].get()
        bool_get_bandwidth = checkboxes[3].get()

        filenames = self.button_frame.return_file_names()

        s11_file_name = filenames[0]
        gain_file_name = filenames[1]
        result_file_name = filenames[2]

        status = extract(
            s11_filename=s11_file_name,
            gain_filename=gain_file_name,
            save_file=result_file_name,
            bool_get_s11=bool_get_s11,
            bool_get_gain=bool_get_gain,
            bool_get_freq=bool_get_freq,
            bool_get_bandwidth=bool_get_bandwidth)

app = App()
app.mainloop()

woah this is too much isnt it?

lets take a look at the core fucntional block


def extract( 
        s11_filename, 
        gain_filename, 
        save_file,
        bool_get_s11,
        bool_get_gain,
        bool_get_freq,
        bool_get_bandwidth):

    def interpolated_intercepts(x, y1, y2):
        """Find the intercepts of two curves, given by the same x data"""

        def intercept(point1, point2, point3, point4):
            """find the intersection between two lines
            the first line is defined by the line between point1 and point2
            the second line is defined by the line between point3 and point4
            each point is an (x,y) tuple.

            So, for example, you can find the intersection between
            intercept((0,0), (1,1), (0,1), (1,0)) = (0.5, 0.5)

            Returns: the intercept, in (x,y) format
            """    

            def line(p1, p2):
                A = (p1[1] - p2[1])
                B = (p2[0] - p1[0])
                C = (p1[0]*p2[1] - p2[0]*p1[1])
                return A, B, -C

            def intersection(L1, L2):
                D  = L1[0] * L2[1] - L1[1] * L2[0]
                Dx = L1[2] * L2[1] - L1[1] * L2[2]
                Dy = L1[0] * L2[2] - L1[2] * L2[0]

                x = Dx / D
                y = Dy / D
                return x,y

            L1 = line([point1[0],point1[1]], [point2[0],point2[1]])
            L2 = line([point3[0],point3[1]], [point4[0],point4[1]])

            R = intersection(L1, L2)

            return R

        idxs = np.argwhere(np.diff(np.sign(y1 - y2)) != 0)

        xcs = []
        ycs = []

        for idx in idxs:
            xc, yc = intercept((x[idx], y1[idx]),((x[idx+1], y1[idx+1])), ((x[idx], y2[idx])), ((x[idx+1], y2[idx+1])))
            xcs.append(xc)
            ycs.append(yc)
        return np.array(xcs), np.array(ycs)

    with open(s11_filename, "r") as f:
        s11_file = [x for x in f.readlines()]

    with open(gain_filename, "r") as f:
        gain_file = [x for x in f.readlines()]

    temp = []
    # get the offsets
    for entry in s11_file[1:]:
        if "Parameters" not in entry:
            temp.append(entry)
        else:
            break
    s11_offset_multiplier = len(temp) + 1

    temp = []
    # get the offsets
    for entry in gain_file[1:]:
        if "Parameters" not in entry:
            temp.append(entry)
        else:
            break
    gain_offset_multiplier = len(temp) + 1

    temp = s11_file[0]
    headers = [x.split("=")[0] for x in temp[15:-2].split("; ")]

    if bool_get_freq: headers.append("freq")
    if bool_get_s11: headers.append("s11")
    if bool_get_gain: headers.append("gain")
    if bool_get_bandwidth: headers.append("bandwidth")

    with open(save_file, "a", newline='') as f:
        writer = csv.writer(f, delimiter=',')
        writer.writerow(headers)

    def get_one(chunk_s11, chunk_gain):

        freqs1 = [float(x[:-1].split('\t')[0]) for x in chunk_gain[3:]]
        gains = [float(x[:-1].split('\t')[1]) for x in chunk_gain[3:]]

        freqs2 = [float(x[:-1].split('\t')[0]) for x in chunk_s11[3:]]
        s11s = [float(x[:-1].split('\t')[1]) for x in chunk_s11[3:]]

        min_s11 = min(s11s)
        freq = freqs2[s11s.index(min_s11)]
        gain = np.interp(freq, freqs1, gains)

        row = [float(x.split("=")[1]) for x in chunk_s11[0][15:-2].split("; ")]

        if bool_get_freq: row.append(freq)
        if bool_get_s11: row.append(min_s11)
        if bool_get_gain: row.append(gain)

        const = []
        for i in range(len(freqs2)):
            const.append(-10)

        s11_crossers, vals = interpolated_intercepts(np.array(freqs2), np.array(const), np.array(s11s))
        d = [x[0] for x in s11_crossers]

        if len(d) % 2 != 0:
            d = d[:-1]

        if bool_get_bandwidth: 
            for i in range(0, len(d), 2):
                if d[i] < freq < d[i + 1]:
                    """importane"""
                    bandwidth = d[i + 1] - d[i]
                    break
            try:
                row.append(bandwidth)
            except UnboundLocalError:
                print(freq, d)

        with open(save_file, "a", newline='') as f:
            writer = csv.writer(f, delimiter=',')
            writer.writerow(row)

    iters = (len(s11_file) // s11_offset_multiplier)

    for i in range(iters):
        current_s11_data = s11_file[-1 * s11_offset_multiplier:]
        current_gain_data = gain_file[-1 * (gain_offset_multiplier):]

        get_one(current_s11_data, current_gain_data)

        s11_file = s11_file[:-1 * s11_offset_multiplier]
        gain_file = gain_file[:-1 * gain_offset_multiplier]

    return 1

1. Reading Raw Data Files:

  • The function opens two input files (s11_filename and gain_filename) that contain the raw data.

  • Each file is read line by line, and the data is stored in two lists (s11_file and gain_file).

2. Finding Offsets for Data Sections:

  • For both files (s11_file and gain_file), the function searches for the section that starts with the word "Parameters."

  • The lines before "Parameters" are considered data lines, and their number determines the offset or the starting point for each chunk of data.

3. Creating the Header Row:

  • The first line of the s11_filename contains some metadata (in a format like key=value).

  • It extracts the keys from this line (like freq, s11, gain, bandwidth), and appends them to form the header row.

  • The header row is customized based on the flags (bool_get_freq, bool_get_s11, etc.) to decide which columns should be included in the final dataset.

4. Opening a New CSV File:

  • A new CSV file (save_file) is opened to store the processed data.

  • The header row (from the previous step) is written into this CSV file.

5. Processing Each Chunk of Data:

  • The data from both files (s11_file and gain_file) is processed in chunks. Each chunk corresponds to one entry in the dataset.

  • For each chunk:

    • It extracts frequency and gain data from gain_file.

    • It extracts frequency and s11 values from s11_file.

    • The function finds the frequency with the minimum s11 value (important for the dataset) and calculates the gain at that frequency by interpolating between the available data points.

    • The s11 and gain values are combined into a single row.

6. Calculating Bandwidth (Optional):

  • The function checks for a frequency range where the s11 value crosses -10 (a threshold for "bandwidth").

  • If the flag bool_get_bandwidth is set to True, it calculates the bandwidth where the s11 value crosses this threshold and appends this value to the row.

7. Appending Data to CSV File:

  • Once the row is completed, it is written into the CSV file (save_file).

8. Repeating for All Data:

  • The function processes all chunks of data by repeating steps 5-7, iterating through the data files.

  • As it processes each chunk, the raw data is reduced by removing the processed sections.

9. Final Result:

  • After processing all chunks of data, the CSV file contains a structured dataset with columns for frequency, s11, gain, and optionally bandwidth.

So, in summary:

  • Raw data is read from files.

  • Offsets are determined to identify the data sections.

  • Headers are created dynamically.

  • Each chunk of data is processed and stored with necessary calculations (like interpolating gain, finding minimum s11, and calculating bandwidth).

  • The result is saved as a structured CSV dataset.

I generated a chat gpt response of how this code works. even i do not have any idea how it works anymore. its been mroe than a year sincce i first formulated it. its inefficeint, yes. it gets the job done, yes. so thats it. we have our daset which looks like this

fl,fw,gt,inl,inw,k,k_mul,pl,pt,pw,sl,st,sw,x,freq,s11,gain,bandwidth
7.5,3.5,0.4,7.0,0.35,4.5,0.5,20.0,0.4,23.7,35.0,3.0,55.0,3.5,3.3634,-15.57056450687,2.658203751200566,0.18545966095168964
8.0,3.5,0.4,7.0,0.35,4.5,0.5,19.0,0.4,23.7,35.0,3.0,55.0,3.5,3.5398,-22.35795735977,4.047312520474252,0.2298745969771261
8.5,3.5,0.4,7.0,0.35,4.5,0.5,18.0,0.4,23.7,35.0,3.0,55.0,3.5,3.7211,-37.020459765932,4.51979263244319,0.261951517039972

waw… naw we is able to does meshin larning…. rest you are familiar with brah…do the meshin larning and do shitz…

obvously the deps

pip install fastapi uvicorn joblib torch numpy pydantic

we train our model using

import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import torch
from torch import nn
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import joblib

# Load dataset
data = pd.read_csv('data.csv')

# Define xcols and ycols
xcols = ['fw', 'inl', 'pl', 'x']
ycols = ['freq', 's11', 'gain']

# Separate features and targets
X = data[xcols].values
y = data[ycols].values

# Split data into train, validation, and test sets (70-15-15 split) BEFORE scaling
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)
X_test[:5], y_test[:5]
# Initialize scalers
scaler_X = MinMaxScaler()
scaler_y = MinMaxScaler()

# Fit only on training data
X_train = scaler_X.fit_transform(X_train)
y_train = scaler_y.fit_transform(y_train)

# Transform validation and test data using the same scaler
X_val = scaler_X.transform(X_val)
y_val = scaler_y.transform(y_val)
X_test = scaler_X.transform(X_test)
y_test = scaler_y.transform(y_test)

# Convert data to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
X_val = torch.tensor(X_val, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)
import torch.nn.functional as F

class FeedForwardNN(nn.Module):
    def __init__(self, input_dim, output_dim, dropout_rate=0.3):
        super(FeedForwardNN, self).__init__()
        self.layer1 = nn.Linear(input_dim, 256)
        self.layer2 = nn.Linear(256, 256)
        self.layer3 = nn.Linear(256, 128)
        self.layer4 = nn.Linear(128, 64)
        self.layer5 = nn.Linear(64, 32)
        self.layer6 = nn.Linear(32, output_dim)

        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=dropout_rate)  # Dropout with given probability

    def forward(self, x):
        x = self.relu(self.layer1(x))
        x = self.dropout(x)  # Dropout after activation
        x = self.relu(self.layer2(x))
        x = self.dropout(x)
        x = self.relu(self.layer3(x))
        x = self.dropout(x)
        x = self.relu(self.layer4(x))
        x = self.dropout(x)
        x = self.relu(self.layer5(x))
        x = self.layer6(x)  # No dropout on output layer
        return x
# Initialize model
model = FeedForwardNN(input_dim=X_train.shape[1], output_dim=y_train.shape[1])

# Define loss function and optimizer with L2 regularization (weight decay)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)  # L2 regularization

# Training loop
epochs = 500
train_losses = []
val_losses = []

for epoch in range(epochs):
    model.train()

    # Forward pass
    predictions = model(X_train)
    train_loss = criterion(predictions, y_train)

    # Backward pass
    optimizer.zero_grad()
    train_loss.backward()
    optimizer.step()

    # Validation loss
    model.eval()
    with torch.no_grad():
        val_predictions = model(X_val)
        val_loss = criterion(val_predictions, y_val)

    # Record losses
    train_losses.append(train_loss.item())
    val_losses.append(val_loss.item())

    # Print training progress
    if (epoch + 1) % 50 == 0:
        print(f"Epoch [{epoch+1}/{epochs}], Train Loss: {train_loss.item():.6f}, Validation Loss: {val_loss.item():.6f}")

# Plot the train and validation loss over time
plt.figure(figsize=(10, 6))
plt.plot(range(epochs), train_losses, label="Train Loss")
plt.plot(range(epochs), val_losses, label="Validation Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.title("Training vs Validation Loss with L2 Regularization")
plt.show()
# Make predictions on the test set
model.eval()
with torch.no_grad():
    y_pred = model(X_test)

# Inverse scale the predictions and true values
y_pred_inv = scaler_y.inverse_transform(y_pred.numpy())
y_test_inv = scaler_y.inverse_transform(y_test.numpy())

# Plot real vs predicted
plt.figure(figsize=(10, 6))
for i in range(y_pred_inv.shape[1]):
    plt.plot(y_test_inv[:, i], label=f'Real {ycols[i]}')
    plt.plot(y_pred_inv[:, i], label=f'Predicted {ycols[i]}', linestyle='--')
plt.xlabel("Sample Index")
plt.ylabel("Value")
plt.legend()
plt.title("Real vs Predicted Values")
plt.show()
# Calculate MSE, MAE, RMSE, R^2 on the test set
mse = mean_squared_error(y_test_inv, y_pred_inv)
mae = mean_absolute_error(y_test_inv, y_pred_inv)
rmse = np.sqrt(mse)
r2 = r2_score(y_test_inv, y_pred_inv)

print(f"MSE: {mse}")
print(f"MAE: {mae}")
print(f"RMSE: {rmse}")
print(f"R^2: {r2}")
# Also save the scalers
joblib.dump(scaler_X, 'scaler_X.pkl')
joblib.dump(scaler_y, 'scaler_y.pkl')
torch.save(model.state_dict(), 'model.pth')

And make a fast api app…

import torch
import torch.nn as nn
import joblib
from fastapi import FastAPI
from pydantic import BaseModel
import numpy as np

import torch.nn.functional as F

class FeedForwardNN(nn.Module):
    def __init__(self, input_dim, output_dim, dropout_rate=0.3):
        super(FeedForwardNN, self).__init__()
        self.layer1 = nn.Linear(input_dim, 256)
        self.layer2 = nn.Linear(256, 256)
        self.layer3 = nn.Linear(256, 128)
        self.layer4 = nn.Linear(128, 64)
        self.layer5 = nn.Linear(64, 32)
        self.layer6 = nn.Linear(32, output_dim)

        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=dropout_rate)  # Dropout with given probability

    def forward(self, x):
        x = self.relu(self.layer1(x))
        x = self.dropout(x)  # Dropout after activation
        x = self.relu(self.layer2(x))
        x = self.dropout(x)
        x = self.relu(self.layer3(x))
        x = self.dropout(x)
        x = self.relu(self.layer4(x))
        x = self.dropout(x)
        x = self.relu(self.layer5(x))
        x = self.layer6(x)  # No dropout on output layer
        return x

# Load the model architecture
model = FeedForwardNN(input_dim=4, output_dim=3)
model.load_state_dict(torch.load('model.pth')) 
model.eval()

# Load the scalers
scaler_X = joblib.load('scaler_X.pkl')
scaler_y = joblib.load('scaler_y.pkl')

# FastAPI setup
app = FastAPI()

class InputData(BaseModel):
    fw: float
    inl: float
    pl: float
    x: float

class OutputData(BaseModel):
    freq: float
    s11: float
    gain: float

@app.post("/predict", response_model=OutputData)
def predict(input_data: InputData):
    input_array = np.array([[input_data.fw, input_data.inl, input_data.pl, input_data.x]])
    input_scaled = scaler_X.transform(input_array)
    input_tensor = torch.tensor(input_scaled, dtype=torch.float32)

    with torch.no_grad():
        prediction_scaled = model(input_tensor).numpy()

    prediction = scaler_y.inverse_transform(prediction_scaled)
    return OutputData(freq=prediction[0][0], s11=prediction[0][1], gain=prediction[0][2])

running the fast api app using

uvicorn app:app --host 0.0.0.0 --port 8000 --reload

and going to the swagger UI, we can check the examples from test set to verify everything working smoothly…

Can also use curl

and hence we can get good enough estimate of a design in miliseconds which took 30-40 minutes on the same hardware previosuly… boosting the design speed tremendously as chat gpt would write..

this is all, the start to end of a ML pipeline and trust me if you understand this, ML jobs are yours. they wont be yours if you keep doing idiotic ML like running in anaconda and shit. get your hands dirty, learn, enjoy.

Made with hate, By Vajra!

PS if you want to play with the dataset and the data extractor;

extractor:
https://github.com/vajradevam/cst-studio-pipeline

datasets:
https://github.com/vajradevam/ad-lab/blob/main/day8/advanced/s11.txt
https://github.com/vajradevam/ad-lab/blob/main/day8/advanced/gain.txt

cleaned dataset:
https://github.com/vajradevam/ad-lab/blob/main/day8/advanced/data.csv

input features (basically they were varied in the simulation to generate data): xcols = ['fw', 'inl', 'pl', 'x']
output features: ycols = ['freq', 's11', 'gain']

0
Subscribe to my newsletter

Read articles from Vājradēvaṁ directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Vājradēvaṁ
Vājradēvaṁ

I am a Student, who finds beauty in simple things. I like to teach sometimes.