ML Pipelines

Machine Learning is often taught in a beta way. By that i mean that bois and grills do not know what they are doing. They import shit, hit the run button in their jupyter notebook and get working models. they do not have any idea how the dataset even gets created or how the model is deployed. THis is my blog and my rant; you can cry over it or learn loosers. Learn and become a sigma chad patrick bateman anakin skywalker. or be in your well as a frog king and be shitty like other pajeets who do not know shit about what they are doing and still call themselves meshin larners. Doing Mashin larnin saaaar…..
Machine learning models are useful only when they can be deployed and accessed in real-world applications. This guide will walk through training a simple model and deploying it using FastAPI.
Bruv before installing these dependencies you should make a virtual environment for ur python in order to not pollute your everything. If you are on windows, I am sorry you are in the wrong feild. If you are on linux, you already know what to do. THen,
Install Dependencies
pip install fastapi uvicorn scikit-learn joblib numpy pandas
Train a Simple Model
Create a file train.py
to generate data, train a model, and save it. Or use this code in a jupyter notebook to get a model.
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
import joblib
# Generate a simple dataset
data_size = 100
np.random.seed(73_32_108_111_118_101_32_98_111_111_98_115)
X = np.random.rand(data_size, 1) * 10
y = 2 * X.squeeze() + 3 + np.random.randn(data_size) * 2
# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Train a simple model
model = LinearRegression()
model.fit(X_train, y_train)
# Save the trained model
joblib.dump(model, 'model.pkl')
This saves the model as model.pkl
.
`.pkl` is pickle. pickle like real world where you store mango and lemon stuff to eat later. same same.
Create a FastAPI App
Fast API was created by this guy
do you think you’ll ever be as smart as him??? you use what he makes…. you cannot even ask him a question in the interview because he made the stuff which you fucks use lmfao!!!!
Create a file app.py
:
from fastapi import FastAPI
from pydantic import BaseModel
import joblib
import numpy as np
# Initialize FastAPI app
app = FastAPI()
# Load trained model
model = joblib.load('model.pkl')
# Define request model
class ModelInput(BaseModel):
features: list[float]
@app.get('/')
def home():
return {"message": "FastAPI model is running!"}
@app.post('/predict')
def predict(data: ModelInput):
features = np.array(data.features).reshape(1, -1) # Ensure correct shape
prediction = model.predict(features)
return {"prediction": prediction.tolist()}
Run the FastAPI Server
Uvicorn is minimal python server implementaion. We could have done this things with node.js and shit but everyone knows that doing javascript is fatherless behaviour.
Start the server using Uvicorn:
uvicorn app:app --host 0.0.0.0 --port 8000 --reload
Test the API
Using Swagger UI
Go to http://127.0.0.1:8000/docs
Using cURL… windows chuds should close their eyes here….
curl -X 'POST' 'http://127.0.0.1:8000/predict' -H 'Content-Type: application/json' -d '{"features": [5.0]}'
Important observation
this way of offering a model as a service “MAAS“ i’ll call it because this is my blog and aman likes to talk in third person because why not? MAAS is model agnostic. any model can be pickled. the input feature can be played with as upon ones wish in the application. so we have a very distinct way of creating a model and serving it which does not depend on the dataset. Noice!
Real Life Implementatioon
So, real life isn’t fun and games. you wont have the dataset. and the ones giving you data themselves wont know at times that what the dataset is about and how any of that is working. I am going to present a problem and its solution which will blow my mind as i am a narcisitic fuck who is awak at 1 am after chugging so much coffeee that i am typing insane shit oh my gawd….
Problem: Designing Antennas
we all take antennas for granted. YOur cell phone, pc, laptop, everything; anything wireless uses an antenna. an antenna translates electric signals into em waves which can travel without any physical link. ie hotspot, bluetooth etc…
see that up down up down line on the esp 32? that is an antenna. specifically a microchip patch antenna… that specific structure is responsible for the wireless fucntion… how, dont ask. so, designing these antenna s difficult, becasuse you dont know the shape you want for a specific fucntion. there are some key propertires of an antenna; s11, gain and frequency. there exist other but we do not care about that.
s11: how much power is the antenna reflecting which is being fed. the lower the better.
gain: how directional is the antenna’s radiation
frequency: what is the frequency at which the antenna is resonating.
the shape, structure and the size of the antenna decides these properties. but simulating each iteration takes long time, let alone fabricating and testing.
but what if we had enough dataset for a specific design that we can interpolate b/w the points and find the perfect antenna?
yes, we do. simulations can be run on software such as CST Studio and Ansys. then the data can be exported and then do mesihn larnin saar
unfortunately, not so easy brah…. the dataset produced by the software is kind of shit…. you need to turn that data into a proper dataset with feature and output vectors, basically turning the nonsense into proper supervised learning task. THis is the first stage of any pipeline. here is a look of the data
S11
#Parameters = {fl=11; fw=1.5; gt=0.4; inl=2; inw=0.15; k=4.5; k_mul=0.5; pl=13; pt=0.4; pw=23.7; sl=35; st=3; sw=55; x=1}
#"Frequency / THz" "S1,1 (1) [Magnitude]"
#----------------------------------------
0.10000000000000 -0.21915244880893
0.10490000000000 -0.22501818589599
0.10980000000000 -0.23097363186125
0.11470000000000 -0.23701309451453
0.11960000000000 -0.24313087781245
0.12450000000000 -0.24932128544623
0.12940000000000 -0.25557862453194
0.13430000000000 -0.26189720940188
Gain
#Parameters = {fl=11; fw=1.5; gt=0.4; inl=2; inw=0.15; k=4.5; k_mul=0.5; pl=13; pt=0.4; pw=23.7; sl=35; st=3; sw=55; x=1}
#"Frequency / THz" "Gain (IEEE),3D,Max. Value (Solid Angle) (1) [Real Part]"
#---------------------------------------------------------------------------
0.10000000000000 -43.803419911702
0.59000000000000 -15.000965803464
1.0800000000000 -8.9299558451649
1.5700000000000 -14.278192668691
2.0600000000000 -9.4267861530604
2.5500000000000 -6.0815920991222
3.0400000000000 -1.6797698165974
3.5300000000000 1.9915585100194
pretty much un-learnable at the moment. what we need to do is to extract each training example from this raw dataset and make a vecotr with the freq, gain, s11 and the parameters which were varied.
doing this manually is human intensive and error prone. it took 3 guys 1 week 12 hours a day to get 1000 training example out of this. a pipeline i built does the same job in 5 seconds. cool isn’t it?
Lets take a deep dive (like chatgpt bsdka says) into the dataset creator lord
from __future__ import division
import customtkinter
from customtkinter import filedialog
import numpy as np
import csv
customtkinter.set_appearance_mode("dark")
customtkinter.set_default_color_theme("dark-blue")
def extract(
s11_filename,
gain_filename,
save_file,
bool_get_s11,
bool_get_gain,
bool_get_freq,
bool_get_bandwidth):
def interpolated_intercepts(x, y1, y2):
"""Find the intercepts of two curves, given by the same x data"""
def intercept(point1, point2, point3, point4):
"""find the intersection between two lines
the first line is defined by the line between point1 and point2
the second line is defined by the line between point3 and point4
each point is an (x,y) tuple.
So, for example, you can find the intersection between
intercept((0,0), (1,1), (0,1), (1,0)) = (0.5, 0.5)
Returns: the intercept, in (x,y) format
"""
def line(p1, p2):
A = (p1[1] - p2[1])
B = (p2[0] - p1[0])
C = (p1[0]*p2[1] - p2[0]*p1[1])
return A, B, -C
def intersection(L1, L2):
D = L1[0] * L2[1] - L1[1] * L2[0]
Dx = L1[2] * L2[1] - L1[1] * L2[2]
Dy = L1[0] * L2[2] - L1[2] * L2[0]
x = Dx / D
y = Dy / D
return x,y
L1 = line([point1[0],point1[1]], [point2[0],point2[1]])
L2 = line([point3[0],point3[1]], [point4[0],point4[1]])
R = intersection(L1, L2)
return R
idxs = np.argwhere(np.diff(np.sign(y1 - y2)) != 0)
xcs = []
ycs = []
for idx in idxs:
xc, yc = intercept((x[idx], y1[idx]),((x[idx+1], y1[idx+1])), ((x[idx], y2[idx])), ((x[idx+1], y2[idx+1])))
xcs.append(xc)
ycs.append(yc)
return np.array(xcs), np.array(ycs)
with open(s11_filename, "r") as f:
s11_file = [x for x in f.readlines()]
with open(gain_filename, "r") as f:
gain_file = [x for x in f.readlines()]
temp = []
# get the offsets
for entry in s11_file[1:]:
if "Parameters" not in entry:
temp.append(entry)
else:
break
s11_offset_multiplier = len(temp) + 1
temp = []
# get the offsets
for entry in gain_file[1:]:
if "Parameters" not in entry:
temp.append(entry)
else:
break
gain_offset_multiplier = len(temp) + 1
temp = s11_file[0]
headers = [x.split("=")[0] for x in temp[15:-2].split("; ")]
if bool_get_freq: headers.append("freq")
if bool_get_s11: headers.append("s11")
if bool_get_gain: headers.append("gain")
if bool_get_bandwidth: headers.append("bandwidth")
with open(save_file, "a", newline='') as f:
writer = csv.writer(f, delimiter=',')
writer.writerow(headers)
def get_one(chunk_s11, chunk_gain):
freqs1 = [float(x[:-1].split('\t')[0]) for x in chunk_gain[3:]]
gains = [float(x[:-1].split('\t')[1]) for x in chunk_gain[3:]]
freqs2 = [float(x[:-1].split('\t')[0]) for x in chunk_s11[3:]]
s11s = [float(x[:-1].split('\t')[1]) for x in chunk_s11[3:]]
min_s11 = min(s11s)
freq = freqs2[s11s.index(min_s11)]
gain = np.interp(freq, freqs1, gains)
row = [float(x.split("=")[1]) for x in chunk_s11[0][15:-2].split("; ")]
if bool_get_freq: row.append(freq)
if bool_get_s11: row.append(min_s11)
if bool_get_gain: row.append(gain)
const = []
for i in range(len(freqs2)):
const.append(-10)
s11_crossers, vals = interpolated_intercepts(np.array(freqs2), np.array(const), np.array(s11s))
d = [x[0] for x in s11_crossers]
if len(d) % 2 != 0:
d = d[:-1]
if bool_get_bandwidth:
for i in range(0, len(d), 2):
if d[i] < freq < d[i + 1]:
"""importane"""
bandwidth = d[i + 1] - d[i]
break
try:
row.append(bandwidth)
except UnboundLocalError:
print(freq, d)
with open(save_file, "a", newline='') as f:
writer = csv.writer(f, delimiter=',')
writer.writerow(row)
iters = (len(s11_file) // s11_offset_multiplier)
for i in range(iters):
current_s11_data = s11_file[-1 * s11_offset_multiplier:]
current_gain_data = gain_file[-1 * (gain_offset_multiplier):]
get_one(current_s11_data, current_gain_data)
s11_file = s11_file[:-1 * s11_offset_multiplier]
gain_file = gain_file[:-1 * gain_offset_multiplier]
return 1
class ChekcBoxFrame(customtkinter.CTkFrame):
def __init__(self, master):
super().__init__(master)
# Parameters Checkbox
self.label_params = customtkinter.CTkLabel(self, text="Parameters")
self.label_params.grid(row=0, column=0, padx=10, pady=(10, 0), sticky="w")
# Checkboxes
self.checkbox_s11 = customtkinter.CTkCheckBox(self, text="S11")
self.checkbox_s11.grid(row=1, column=0, padx=10, pady=(10, 0), sticky="w")
self.checkbox_gain = customtkinter.CTkCheckBox(self, text="GAIN")
self.checkbox_gain.grid(row=1, column=1, padx=10, pady=(10, 0), sticky="w")
self.checkbox_freq = customtkinter.CTkCheckBox(self, text="FREQ")
self.checkbox_freq.grid(row=2, column=0, padx=10, pady=(10, 0), sticky="w")
self.checkbox_bandwidth = customtkinter.CTkCheckBox(self, text="BANDWIDTH")
self.checkbox_bandwidth.grid(row=2, column=1, padx=10, pady=(10, 0), sticky="w")
def return_checkboxes(self):
return (self.checkbox_s11, self.checkbox_gain, self.checkbox_freq, self.checkbox_bandwidth)
class ButtonFrame(customtkinter.CTkFrame):
def __init__(self, master):
super().__init__(master)
self.s11_button = customtkinter.CTkButton(self, text="S11 File", command=self.select_s11_file)
self.s11_button.grid(row=0, column=0, padx=10, pady=(10, 0), sticky="w")
self.gain_button = customtkinter.CTkButton(self, text="Gain File", command=self.select_gain_file)
self.gain_button.grid(row=1, column=0, padx=10, pady=(10, 0), sticky="w")
self.savefile_button = customtkinter.CTkButton(self, text="Result File", command=self.select_result_file)
self.savefile_button.grid(row=2, column=0, padx=10, pady=(10, 0), sticky="w")
self.s11_file_name = ""
self.gain_file_name = ""
self.result_file_name = ""
# File Text Boxes
self.text_box_s11 = customtkinter.CTkTextbox(self, height=1, width=200)
self.text_box_s11.grid(row=3, column=1, padx=10, pady=(10, 0), sticky="w")
self.text_box_gain = customtkinter.CTkTextbox(self, height=1, width=200)
self.text_box_gain.grid(row=4, column=1, padx=10, pady=(10, 0), sticky="w")
self.text_box_result = customtkinter.CTkTextbox(self, height=1, width=200)
self.text_box_result.grid(row=5, column=1, padx=10, pady=(10, 0), sticky="w")
# File Labels
self.label_s11_file = customtkinter.CTkLabel(self, text="S11 File: ")
self.label_s11_file.grid(row=3, column=0, padx=10, pady=(10, 0), sticky="w")
self.label_gain_file = customtkinter.CTkLabel(self, text="Gain File: ")
self.label_gain_file.grid(row=4, column=0, padx=10, pady=(10, 0), sticky="w")
self.label_save_file = customtkinter.CTkLabel(self, text="Save File: ")
self.label_save_file.grid(row=5, column=0, padx=10, pady=(10, 0), sticky="w")
def select_s11_file(self):
self.s11_file_name = filedialog.askopenfilename()
self.text_box_s11.insert("0.0", text = self.s11_file_name)
def select_gain_file(self):
self.gain_file_name = filedialog.askopenfilename()
self.text_box_gain.insert("0.0", text = self.gain_file_name)
def select_result_file(self):
self.result_file_name = filedialog.askopenfilename()
self.text_box_result.insert("0.0", text = self.result_file_name)
def return_file_names(self):
return (self.s11_file_name, self.gain_file_name, self.result_file_name)
class App(customtkinter.CTk):
def __init__(self):
super().__init__()
self.title("CST Studio 2022 Data Pipeline 0.1.2")
self.geometry("700x500")
self.grid_columnconfigure(0, weight=1)
self.grid_rowconfigure((0, 0), weight=1)
self.checkbox_frame = ChekcBoxFrame(self)
self.checkbox_frame.grid(row=0, column=0, padx=10, pady=(10, 10), sticky="news")
self.button_frame = ButtonFrame(self)
self.button_frame.grid(row=0, column=1, padx=10, pady=(10, 10), sticky="news")
self.button_compute = customtkinter.CTkButton(self, text="Calculate", command=self.perform_task)
self.button_compute.grid(row=1, column=0, padx=10, pady=(10, 10), sticky="news")
self.label_desc = customtkinter.CTkLabel(self, text="©Vajradevam S., Neptunes Aerospace, 2024")
self.label_desc.grid(row=1, column=1, padx=10, pady=(10, 10), sticky="we")
def perform_task(self):
checkboxes = self.checkbox_frame.return_checkboxes()
bool_get_s11 = checkboxes[0].get()
bool_get_gain = checkboxes[1].get()
bool_get_freq = checkboxes[2].get()
bool_get_bandwidth = checkboxes[3].get()
filenames = self.button_frame.return_file_names()
s11_file_name = filenames[0]
gain_file_name = filenames[1]
result_file_name = filenames[2]
status = extract(
s11_filename=s11_file_name,
gain_filename=gain_file_name,
save_file=result_file_name,
bool_get_s11=bool_get_s11,
bool_get_gain=bool_get_gain,
bool_get_freq=bool_get_freq,
bool_get_bandwidth=bool_get_bandwidth)
app = App()
app.mainloop()
woah this is too much isnt it?
lets take a look at the core fucntional block
def extract(
s11_filename,
gain_filename,
save_file,
bool_get_s11,
bool_get_gain,
bool_get_freq,
bool_get_bandwidth):
def interpolated_intercepts(x, y1, y2):
"""Find the intercepts of two curves, given by the same x data"""
def intercept(point1, point2, point3, point4):
"""find the intersection between two lines
the first line is defined by the line between point1 and point2
the second line is defined by the line between point3 and point4
each point is an (x,y) tuple.
So, for example, you can find the intersection between
intercept((0,0), (1,1), (0,1), (1,0)) = (0.5, 0.5)
Returns: the intercept, in (x,y) format
"""
def line(p1, p2):
A = (p1[1] - p2[1])
B = (p2[0] - p1[0])
C = (p1[0]*p2[1] - p2[0]*p1[1])
return A, B, -C
def intersection(L1, L2):
D = L1[0] * L2[1] - L1[1] * L2[0]
Dx = L1[2] * L2[1] - L1[1] * L2[2]
Dy = L1[0] * L2[2] - L1[2] * L2[0]
x = Dx / D
y = Dy / D
return x,y
L1 = line([point1[0],point1[1]], [point2[0],point2[1]])
L2 = line([point3[0],point3[1]], [point4[0],point4[1]])
R = intersection(L1, L2)
return R
idxs = np.argwhere(np.diff(np.sign(y1 - y2)) != 0)
xcs = []
ycs = []
for idx in idxs:
xc, yc = intercept((x[idx], y1[idx]),((x[idx+1], y1[idx+1])), ((x[idx], y2[idx])), ((x[idx+1], y2[idx+1])))
xcs.append(xc)
ycs.append(yc)
return np.array(xcs), np.array(ycs)
with open(s11_filename, "r") as f:
s11_file = [x for x in f.readlines()]
with open(gain_filename, "r") as f:
gain_file = [x for x in f.readlines()]
temp = []
# get the offsets
for entry in s11_file[1:]:
if "Parameters" not in entry:
temp.append(entry)
else:
break
s11_offset_multiplier = len(temp) + 1
temp = []
# get the offsets
for entry in gain_file[1:]:
if "Parameters" not in entry:
temp.append(entry)
else:
break
gain_offset_multiplier = len(temp) + 1
temp = s11_file[0]
headers = [x.split("=")[0] for x in temp[15:-2].split("; ")]
if bool_get_freq: headers.append("freq")
if bool_get_s11: headers.append("s11")
if bool_get_gain: headers.append("gain")
if bool_get_bandwidth: headers.append("bandwidth")
with open(save_file, "a", newline='') as f:
writer = csv.writer(f, delimiter=',')
writer.writerow(headers)
def get_one(chunk_s11, chunk_gain):
freqs1 = [float(x[:-1].split('\t')[0]) for x in chunk_gain[3:]]
gains = [float(x[:-1].split('\t')[1]) for x in chunk_gain[3:]]
freqs2 = [float(x[:-1].split('\t')[0]) for x in chunk_s11[3:]]
s11s = [float(x[:-1].split('\t')[1]) for x in chunk_s11[3:]]
min_s11 = min(s11s)
freq = freqs2[s11s.index(min_s11)]
gain = np.interp(freq, freqs1, gains)
row = [float(x.split("=")[1]) for x in chunk_s11[0][15:-2].split("; ")]
if bool_get_freq: row.append(freq)
if bool_get_s11: row.append(min_s11)
if bool_get_gain: row.append(gain)
const = []
for i in range(len(freqs2)):
const.append(-10)
s11_crossers, vals = interpolated_intercepts(np.array(freqs2), np.array(const), np.array(s11s))
d = [x[0] for x in s11_crossers]
if len(d) % 2 != 0:
d = d[:-1]
if bool_get_bandwidth:
for i in range(0, len(d), 2):
if d[i] < freq < d[i + 1]:
"""importane"""
bandwidth = d[i + 1] - d[i]
break
try:
row.append(bandwidth)
except UnboundLocalError:
print(freq, d)
with open(save_file, "a", newline='') as f:
writer = csv.writer(f, delimiter=',')
writer.writerow(row)
iters = (len(s11_file) // s11_offset_multiplier)
for i in range(iters):
current_s11_data = s11_file[-1 * s11_offset_multiplier:]
current_gain_data = gain_file[-1 * (gain_offset_multiplier):]
get_one(current_s11_data, current_gain_data)
s11_file = s11_file[:-1 * s11_offset_multiplier]
gain_file = gain_file[:-1 * gain_offset_multiplier]
return 1
1. Reading Raw Data Files:
The function opens two input files (
s11_filename
andgain_filename
) that contain the raw data.Each file is read line by line, and the data is stored in two lists (
s11_file
andgain_file
).
2. Finding Offsets for Data Sections:
For both files (
s11_file
andgain_file
), the function searches for the section that starts with the word "Parameters."The lines before "Parameters" are considered data lines, and their number determines the offset or the starting point for each chunk of data.
3. Creating the Header Row:
The first line of the
s11_filename
contains some metadata (in a format likekey=value
).It extracts the keys from this line (like
freq
,s11
,gain
,bandwidth
), and appends them to form the header row.The header row is customized based on the flags (
bool_get_freq
,bool_get_s11
, etc.) to decide which columns should be included in the final dataset.
4. Opening a New CSV File:
A new CSV file (
save_file
) is opened to store the processed data.The header row (from the previous step) is written into this CSV file.
5. Processing Each Chunk of Data:
The data from both files (
s11_file
andgain_file
) is processed in chunks. Each chunk corresponds to one entry in the dataset.For each chunk:
It extracts frequency and gain data from
gain_file
.It extracts frequency and
s11
values froms11_file
.The function finds the frequency with the minimum
s11
value (important for the dataset) and calculates the gain at that frequency by interpolating between the available data points.The
s11
and gain values are combined into a single row.
6. Calculating Bandwidth (Optional):
The function checks for a frequency range where the
s11
value crosses -10 (a threshold for "bandwidth").If the flag
bool_get_bandwidth
is set toTrue
, it calculates the bandwidth where thes11
value crosses this threshold and appends this value to the row.
7. Appending Data to CSV File:
- Once the row is completed, it is written into the CSV file (
save_file
).
8. Repeating for All Data:
The function processes all chunks of data by repeating steps 5-7, iterating through the data files.
As it processes each chunk, the raw data is reduced by removing the processed sections.
9. Final Result:
- After processing all chunks of data, the CSV file contains a structured dataset with columns for frequency,
s11
, gain, and optionally bandwidth.
So, in summary:
Raw data is read from files.
Offsets are determined to identify the data sections.
Headers are created dynamically.
Each chunk of data is processed and stored with necessary calculations (like interpolating gain, finding minimum
s11
, and calculating bandwidth).The result is saved as a structured CSV dataset.
I generated a chat gpt response of how this code works. even i do not have any idea how it works anymore. its been mroe than a year sincce i first formulated it. its inefficeint, yes. it gets the job done, yes. so thats it. we have our daset which looks like this
fl,fw,gt,inl,inw,k,k_mul,pl,pt,pw,sl,st,sw,x,freq,s11,gain,bandwidth
7.5,3.5,0.4,7.0,0.35,4.5,0.5,20.0,0.4,23.7,35.0,3.0,55.0,3.5,3.3634,-15.57056450687,2.658203751200566,0.18545966095168964
8.0,3.5,0.4,7.0,0.35,4.5,0.5,19.0,0.4,23.7,35.0,3.0,55.0,3.5,3.5398,-22.35795735977,4.047312520474252,0.2298745969771261
8.5,3.5,0.4,7.0,0.35,4.5,0.5,18.0,0.4,23.7,35.0,3.0,55.0,3.5,3.7211,-37.020459765932,4.51979263244319,0.261951517039972
waw… naw we is able to does meshin larning…. rest you are familiar with brah…do the meshin larning and do shitz…
obvously the deps
pip install fastapi uvicorn joblib torch numpy pydantic
we train our model using
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import torch
from torch import nn
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import joblib
# Load dataset
data = pd.read_csv('data.csv')
# Define xcols and ycols
xcols = ['fw', 'inl', 'pl', 'x']
ycols = ['freq', 's11', 'gain']
# Separate features and targets
X = data[xcols].values
y = data[ycols].values
# Split data into train, validation, and test sets (70-15-15 split) BEFORE scaling
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)
X_test[:5], y_test[:5]
# Initialize scalers
scaler_X = MinMaxScaler()
scaler_y = MinMaxScaler()
# Fit only on training data
X_train = scaler_X.fit_transform(X_train)
y_train = scaler_y.fit_transform(y_train)
# Transform validation and test data using the same scaler
X_val = scaler_X.transform(X_val)
y_val = scaler_y.transform(y_val)
X_test = scaler_X.transform(X_test)
y_test = scaler_y.transform(y_test)
# Convert data to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
X_val = torch.tensor(X_val, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)
import torch.nn.functional as F
class FeedForwardNN(nn.Module):
def __init__(self, input_dim, output_dim, dropout_rate=0.3):
super(FeedForwardNN, self).__init__()
self.layer1 = nn.Linear(input_dim, 256)
self.layer2 = nn.Linear(256, 256)
self.layer3 = nn.Linear(256, 128)
self.layer4 = nn.Linear(128, 64)
self.layer5 = nn.Linear(64, 32)
self.layer6 = nn.Linear(32, output_dim)
self.relu = nn.ReLU()
self.dropout = nn.Dropout(p=dropout_rate) # Dropout with given probability
def forward(self, x):
x = self.relu(self.layer1(x))
x = self.dropout(x) # Dropout after activation
x = self.relu(self.layer2(x))
x = self.dropout(x)
x = self.relu(self.layer3(x))
x = self.dropout(x)
x = self.relu(self.layer4(x))
x = self.dropout(x)
x = self.relu(self.layer5(x))
x = self.layer6(x) # No dropout on output layer
return x
# Initialize model
model = FeedForwardNN(input_dim=X_train.shape[1], output_dim=y_train.shape[1])
# Define loss function and optimizer with L2 regularization (weight decay)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5) # L2 regularization
# Training loop
epochs = 500
train_losses = []
val_losses = []
for epoch in range(epochs):
model.train()
# Forward pass
predictions = model(X_train)
train_loss = criterion(predictions, y_train)
# Backward pass
optimizer.zero_grad()
train_loss.backward()
optimizer.step()
# Validation loss
model.eval()
with torch.no_grad():
val_predictions = model(X_val)
val_loss = criterion(val_predictions, y_val)
# Record losses
train_losses.append(train_loss.item())
val_losses.append(val_loss.item())
# Print training progress
if (epoch + 1) % 50 == 0:
print(f"Epoch [{epoch+1}/{epochs}], Train Loss: {train_loss.item():.6f}, Validation Loss: {val_loss.item():.6f}")
# Plot the train and validation loss over time
plt.figure(figsize=(10, 6))
plt.plot(range(epochs), train_losses, label="Train Loss")
plt.plot(range(epochs), val_losses, label="Validation Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.title("Training vs Validation Loss with L2 Regularization")
plt.show()
# Make predictions on the test set
model.eval()
with torch.no_grad():
y_pred = model(X_test)
# Inverse scale the predictions and true values
y_pred_inv = scaler_y.inverse_transform(y_pred.numpy())
y_test_inv = scaler_y.inverse_transform(y_test.numpy())
# Plot real vs predicted
plt.figure(figsize=(10, 6))
for i in range(y_pred_inv.shape[1]):
plt.plot(y_test_inv[:, i], label=f'Real {ycols[i]}')
plt.plot(y_pred_inv[:, i], label=f'Predicted {ycols[i]}', linestyle='--')
plt.xlabel("Sample Index")
plt.ylabel("Value")
plt.legend()
plt.title("Real vs Predicted Values")
plt.show()
# Calculate MSE, MAE, RMSE, R^2 on the test set
mse = mean_squared_error(y_test_inv, y_pred_inv)
mae = mean_absolute_error(y_test_inv, y_pred_inv)
rmse = np.sqrt(mse)
r2 = r2_score(y_test_inv, y_pred_inv)
print(f"MSE: {mse}")
print(f"MAE: {mae}")
print(f"RMSE: {rmse}")
print(f"R^2: {r2}")
# Also save the scalers
joblib.dump(scaler_X, 'scaler_X.pkl')
joblib.dump(scaler_y, 'scaler_y.pkl')
torch.save(model.state_dict(), 'model.pth')
And make a fast api app…
import torch
import torch.nn as nn
import joblib
from fastapi import FastAPI
from pydantic import BaseModel
import numpy as np
import torch.nn.functional as F
class FeedForwardNN(nn.Module):
def __init__(self, input_dim, output_dim, dropout_rate=0.3):
super(FeedForwardNN, self).__init__()
self.layer1 = nn.Linear(input_dim, 256)
self.layer2 = nn.Linear(256, 256)
self.layer3 = nn.Linear(256, 128)
self.layer4 = nn.Linear(128, 64)
self.layer5 = nn.Linear(64, 32)
self.layer6 = nn.Linear(32, output_dim)
self.relu = nn.ReLU()
self.dropout = nn.Dropout(p=dropout_rate) # Dropout with given probability
def forward(self, x):
x = self.relu(self.layer1(x))
x = self.dropout(x) # Dropout after activation
x = self.relu(self.layer2(x))
x = self.dropout(x)
x = self.relu(self.layer3(x))
x = self.dropout(x)
x = self.relu(self.layer4(x))
x = self.dropout(x)
x = self.relu(self.layer5(x))
x = self.layer6(x) # No dropout on output layer
return x
# Load the model architecture
model = FeedForwardNN(input_dim=4, output_dim=3)
model.load_state_dict(torch.load('model.pth'))
model.eval()
# Load the scalers
scaler_X = joblib.load('scaler_X.pkl')
scaler_y = joblib.load('scaler_y.pkl')
# FastAPI setup
app = FastAPI()
class InputData(BaseModel):
fw: float
inl: float
pl: float
x: float
class OutputData(BaseModel):
freq: float
s11: float
gain: float
@app.post("/predict", response_model=OutputData)
def predict(input_data: InputData):
input_array = np.array([[input_data.fw, input_data.inl, input_data.pl, input_data.x]])
input_scaled = scaler_X.transform(input_array)
input_tensor = torch.tensor(input_scaled, dtype=torch.float32)
with torch.no_grad():
prediction_scaled = model(input_tensor).numpy()
prediction = scaler_y.inverse_transform(prediction_scaled)
return OutputData(freq=prediction[0][0], s11=prediction[0][1], gain=prediction[0][2])
running the fast api app using
uvicorn app:app --host 0.0.0.0 --port 8000 --reload
and going to the swagger UI, we can check the examples from test set to verify everything working smoothly…
Can also use curl
and hence we can get good enough estimate of a design in miliseconds which took 30-40 minutes on the same hardware previosuly… boosting the design speed tremendously as chat gpt would write..
this is all, the start to end of a ML pipeline and trust me if you understand this, ML jobs are yours. they wont be yours if you keep doing idiotic ML like running in anaconda and shit. get your hands dirty, learn, enjoy.
Made with hate, By Vajra!
PS if you want to play with the dataset and the data extractor;
extractor:
https://github.com/vajradevam/cst-studio-pipeline
datasets:
https://github.com/vajradevam/ad-lab/blob/main/day8/advanced/s11.txt
https://github.com/vajradevam/ad-lab/blob/main/day8/advanced/gain.txt
cleaned dataset:
https://github.com/vajradevam/ad-lab/blob/main/day8/advanced/data.csv
input features (basically they were varied in the simulation to generate data): xcols = ['fw', 'inl', 'pl', 'x']
output features: ycols = ['freq', 's11', 'gain']
Subscribe to my newsletter
Read articles from Vājradēvaṁ directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by

Vājradēvaṁ
Vājradēvaṁ
I am a Student, who finds beauty in simple things. I like to teach sometimes.