Solving Complex Data pipelines with Composio + Datatune


Table of Contents:
The Data Integration Challenge
Composio : Integration Platform
for AI Agents & LLMsDatatune: Perform transformations on your data with natural language
Real-World Example: Analyzing GitHub Issues
Summary
The Data Integration Challenge
Modern businesses rely on data scattered across dozens of platforms such as CRMs, project management tools, communication platforms, databases, and APIs. The traditional approach involves:
Complex API integrations with different authentication methods
Constant maintenance as APIs change and evolve
Complicated code to perform data transformations that requires semantic context understanding.
What if you could connect to any external service, pull data, and transform it using nothing but natural language? Let’s see how we can engineer a data pipeline using Composio and Datatune.
Composio : Integration Platform for AI Agents & LLMs**
Composio eliminates integration complexity by providing:
200+ pre-built integrations across every major platform (eg: Salesforce, GitHub, Slack, Google Sheets, Notion, etc**)**
One-click authentication handling OAuth, API keys, and complex flows
Unified interface that abstracts away API differences
Built for AI workflows with structured, consistent outputs.
Using Composeio can also help you connect MCPs to your AI Agents, so with very few steps you can avoid painful API orchestration, redundant boilerplate code, and platform-specific edge cases.
Datatune: Perform transformations on your data with natural language
One of the major complexities of data pipelines is transforming messy tabular data into clean, usable formats, especially when the transformation requires understanding the semantic meaning of the data and the task at hand.
Consider a sales spreadsheet with product names like “iPhone 15 Pro Max 256GB Blue”. Extracting just the color would normally require complex regex patterns to handle every variation. With Datatune, you simply say “Extract the color from product name” and it understands context automatically. You can then chain operations naturally: first extract colors and categories with Map, then Filter to “Keep only blue electronics,” and finally clean up the results.
This approach is powerful because each step builds on the previous one, letting you transform millions of rows by describing what you want rather than writing long lines of pandas or regex code for the same.
Real-World Example: Analyzing GitHub Issues
Let’s walk through a practical example where we analyze GitHub issues to help maintainers prioritize their work. We will fetch issues from pytorch’s Github repository using composio, and will process the data using datatune to find the issues that could be “good first contributions for new developers”. Here’s how simple it becomes with Composio + Datatune:
Install dependencies
Install both libraries (dask will be automatically installed with datatune) and dotenv for loading your environment variables:
pip install composio datatune python-dotenv
Setup and Configuration
We need API keys to interact with Composio. Go to https://app.composio.dev and login and get your API key.
For using LLMs with Datatune, you can either use OpenAI, or local models with the help of Ollama, or from any other API providers such as Azure. For more info about how to use different providers, refer to this link: https://docs.datatune.ai/LLM.html
For the sake of this article, we will use Azure OpenAI as the provider.
Once you’re ready with all the credentials, create a .env file and add your environment variables like this:
COMPOSIO_API_KEY=your-composio-key
AZURE_OPENAI_API_KEY=your-key
AZURE_API_BASE=https://your-endpoint.openai.azure.com/
AZURE_API_VERSION=2024-02-01
Let’s import the libraries:
import os
import pandas as pd
import dask.dataframe as dd
import datatune as dt
from composio import ComposioToolSet, App, Action
from datatune.core.map import Map
from datatune.core.filter import Filter
from datatune.llm.llm import Azure
from dotenv import load_dotenv
load_dotenv()
COMPOSIO_API_KEY = os.getenv("COMPOSIO_API_KEY")
api_key = os.getenv("AZURE_OPENAI_API_KEY")
api_base = os.getenv("AZURE_API_BASE")
api_version = os.getenv("AZURE_API_VERSION", "2024-02-01")
Connect to GitHub with Composio
We will use Composio to connect to the GitHub repository of Pytorch using ComposioToolset. Composio provides several actions that a user can perform from their integration choice. In our case, we can use the action GITHUB_LIST_REPOSITORY_ISSUES which will return us with the required data from Github issues of pytorch with the help of the following function.
Let’s get the issues from https://github.com/pytorch/pytorch, so set the repo owner as ‘pytorch’ and repo_name as ‘pytorch’
def fetch_github_issues(toolset, repo_owner="pytorch", repo_name="pytorch", limit=30):
result = toolset.execute_action(
action=Action.GITHUB_LIST_REPOSITORY_ISSUES,
params={
"owner": repo_owner,
"repo": repo_name,
"state": "open",
"per_page": 30
}
)
# Extract issues data from result
issues_data = []
if isinstance(result, dict) and result.get('successful'):
data = result.get('data', {})
if isinstance(data, list):
issues_data = data
elif isinstance(data, dict):
# Look for issues in common response patterns
for key in ['details', 'items', 'data', 'issues', 'results']:
if key in data and isinstance(data[key], list):
issues_data = data[key]
break
# Check if it's a single issue object
if not issues_data and 'number' in data and 'title' in data:
issues_data = [data]
elif isinstance(result, list):
issues_data = result
if not isinstance(issues_data, list):
return pd.DataFrame()
# Process issues into DataFrame
processed_issues = []
for i, issue in enumerate(issues_data):
if i >= limit:
break
if isinstance(issue, dict):
processed_issues.append({
"issue_number": issue.get("number"),
"title": issue.get("title", ""),
"issue_body": issue.get("body", "")[:500] if issue.get("body") else "",
"state": issue.get("state", ""),
"comments_count": issue.get("comments", 0),
"labels": [label.get("name", "") for label in issue.get("labels", [])] if issue.get("labels") else [],
"created_at": issue.get("created_at", ""),
"updated_at": issue.get("updated_at", ""),
"html_url": issue.get("html_url", "")
})
return pd.DataFrame(processed_issues)
Transform Data with Natural Language using Datatune
The result data from the above function contains the following columns: issue_number, title, issue_body, state, comments_count, labels, created_at, updated_at, html_url
Instead of complicated python code to make edits to this data, we will simply use Datatune.
We will perform two major operations chained together.
- Map Operation: To Replace values or Add new columns based on existing data
In our case, we will perform the map operation primarily to classify issues into severity levels, estimated efforts, and issue types (bug or feature) and output this data into respective new columns.
2. Filter Operation: Remove Specific Rows
We will remove the rows that are not good first issues.
Let’s see how we can write prompts into datatune to perform these operations and chain them. We will use gpt-4.1-mini as the choice of LLM for both operations. Since Datatune uses Dask under the hood, we use the .compute() method on the dask dataframe to trigger the data transformation. In the end, we apply dt.finalize() to clear internal metadata that was created during this process .
def analyze_with_datatune(df):
if df.empty:
return pd.DataFrame(), pd.DataFrame()
dask_df = dd.from_pandas(df, npartitions=1)
llm = Azure(
model_name="gpt-4.1-mini",
api_key=api_key,
api_base=api_base,
api_version=api_version,
)
# Map operation: Analyze each issue
mapped = Map(
prompt="Based on the issue title, description, and labels, determine: 1) severity (high/medium/low) - consider critical bugs, memory leaks, crashes as high; 2) estimated effort to fix (high/medium/low); 3) issue type (bug/feature/documentation/other)",
output_fields=["severity", "estimated_effort", "issue_type"]
)(llm, dask_df)
# Filter operation: Find high severity issues
good_first_issues = Filter(
prompt="Keep issues that look like they could be good first contributions for new developers"
)(llm, mapped)
return dt.finalize(good_first_issues.compute())
Let’s wrap everything up and take a look at the full code:
import os
import pandas as pd
import dask.dataframe as dd
from composio import ComposioToolSet, App, Action
from datatune.core.map import Map
from datatune.core.filter import Filter
from datatune.llm.llm import Azure
import datatune as dt
from dotenv import load_dotenv
load_dotenv()
# Configuration
COMPOSIO_API_KEY = os.getenv("COMPOSIO_API_KEY")
api_key = os.getenv("AZURE_OPENAI_API_KEY")
api_base = os.getenv("AZURE_API_BASE")
api_version = os.getenv("AZURE_API_VERSION", "2024-02-01")
def setup_composio():
toolset = ComposioToolSet(api_key=COMPOSIO_API_KEY)
return toolset
def fetch_github_issues(toolset, repo_owner="pytorch", repo_name="pytorch", limit=30):
result = toolset.execute_action(
action=Action.GITHUB_LIST_REPOSITORY_ISSUES,
params={
"owner": repo_owner,
"repo": repo_name,
"state": "open",
"per_page": limit
}
)
# Extract issues data from result
issues_data = []
if isinstance(result, dict) and result.get('successful'):
data = result.get('data', {})
if isinstance(data, list):
issues_data = data
elif isinstance(data, dict):
# Look for issues in common response patterns
for key in ['details', 'items', 'data', 'issues', 'results']:
if key in data and isinstance(data[key], list):
issues_data = data[key]
break
# Check if it's a single issue object
if not issues_data and 'number' in data and 'title' in data:
issues_data = [data]
elif isinstance(result, list):
issues_data = result
if not isinstance(issues_data, list):
return pd.DataFrame()
# Process issues into DataFrame
processed_issues = []
for i, issue in enumerate(issues_data):
if i >= limit:
break
if isinstance(issue, dict):
processed_issues.append({
"issue_number": issue.get("number"),
"title": issue.get("title", ""),
"issue_body": issue.get("body", "")[:500] if issue.get("body") else "",
"state": issue.get("state", ""),
"comments_count": issue.get("comments", 0),
"labels": [label.get("name", "") for label in issue.get("labels", [])] if issue.get("labels") else [],
"created_at": issue.get("created_at", ""),
"updated_at": issue.get("updated_at", ""),
"html_url": issue.get("html_url", "")
})
return pd.DataFrame(processed_issues)
def analyze_with_datatune(df):
if df.empty:
return pd.DataFrame(), pd.DataFrame()
dask_df = dd.from_pandas(df, npartitions=1)
llm = Azure(
model_name="gpt-4.1-mini",
api_key=api_key,
api_base=api_base,
api_version=api_version,
)
# Map operation: Analyze each issue
mapped = Map(
prompt="Based on the issue title, description, and labels, determine: 1) severity (high/medium/low) - consider critical bugs, memory leaks, crashes as high; 2) estimated effort to fix (high/medium/low); 3) issue type (bug/feature/documentation/other)",
output_fields=["severity", "estimated_effort", "issue_type"]
)(llm, dask_df)
# Filter operation: Find high severity issues
good_first_issues = Filter(
prompt="Keep issues that look like they could be good first contributions for new developers"
)(llm, mapped)
final_df = good_first_issues.compute()
return dt.finalize(final_df)
def main():
toolset = setup_composio()
issues_df = fetch_github_issues(toolset)
good_first_issues = analyze_with_datatune(issues_df)
if not good_first_issues.empty:
good_first_issues.to_csv("good_first_issues.csv", index=False)
print(f" - good_first_issues.csv ({len(good_first_issues)} issues)")
if __name__ == "__main__":
main()
The results in the good_first_issues.csv should look something like this:
issue_number,title,issue_body,state,comments_count,labels,created_at,updated_at,html_url,severity,estimated_effort,issue_type
45123,"Fix typo in torch.nn.functional documentation","There's a typo in the documentation for F.relu where 'activation' is misspelled as 'activaton'. This should be a simple fix...",open,2,"['good first issue', 'module: docs']",2025-01-15T14:22:31Z,2025-01-15T16:45:12Z,https://github.com/pytorch/pytorch/issues/45123,low,low,documentation
45067,"Add unit test for DataLoader pin_memory","The pin_memory functionality in DataLoader is missing unit tests. We need to add tests that verify tensors are properly pinned...",open,4,"['good first issue', 'module: tests', 'module: dataloader']",2025-01-14T09:18:55Z,2025-01-16T08:30:22Z,https://github.com/pytorch/pytorch/issues/45067,low,low,other
44982,"Update error message for mismatched tensor sizes","When tensors have mismatched sizes in operations, the error message could be clearer. Currently shows indices, but should show actual shapes...",open,1,"['good first issue', 'module: error messages']",2025-01-12T11:45:33Z,2025-01-13T10:12:44Z,https://github.com/pytorch/pytorch/issues/44982,low,low,feature
Summary
Using Composio and Datatune saves countless hours for engineering data engineering pipelines by abstracting away integration architecture and understanding semantic context for performing transformations on the data.
Subscribe to my newsletter
Read articles from Abhijith Neil Abraham directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
