CSVision / CSV_rag_.py
ShaswatSingh
Upload 4 files
da92c86 verified
import gradio as gr
import pandas as pd
import os
import matplotlib.pyplot as plt
import io
from PIL import Image
import base64
import re
import numpy as np
from llama_index.llms.groq import Groq
from llama_index.core.query_pipeline import (
QueryPipeline as QP,
Link,
InputComponent,
)
from llama_index.experimental.query_engine.pandas import (
PandasInstructionParser,
)
from llama_index.core import PromptTemplate
# Example datasets
EXAMPLE_DATASETS = {
"Hotel Bookings": "hotel_bookings.csv",
}
def load_dataframe(file_path):
try:
if isinstance(file_path, str):
# If it's a URL or file path
df = pd.read_csv(file_path)
else:
# If it's an uploaded file
df = pd.read_csv(file_path.name)
return df, f"Successfully loaded dataset with {df.shape[0]} rows and {df.shape[1]} columns."
except Exception as e:
return None, f"Error loading dataset: {str(e)}"
def create_query_pipeline(df, api_key, model="llama-3.3-70b-versatile"):
# Create Groq LLM with the provided API key
try:
llm = Groq(model=model, api_key=api_key)
except Exception as e:
return None, f"Error initializing Groq LLM: {str(e)}"
instruction_str = (
"1. Convert the query to executable Python code using Pandas.\n"
"2. The final line of code should be a Python expression that can be called with the `eval()` function.\n"
"3. The code should represent a solution to the query.\n"
"4. PRINT ONLY THE EXPRESSION.\n"
"5. Do not quote the expression.\n"
)
pandas_prompt_str = (
"You are working with a pandas dataframe in Python.\n"
"The name of the dataframe is `df`.\n"
"This is the result of `print(df.head())`:\n"
"{df_str}\n\n"
"Follow these instructions:\n"
"{instruction_str}\n"
"Query: {query_str}\n\n"
"Expression:"
)
response_synthesis_prompt_str = (
"Given an input question, synthesize a response from the query results.\n"
"Query: {query_str}\n\n"
"Pandas Instructions (optional):\n{pandas_instructions}\n\n"
"Pandas Output: {pandas_output}\n\n"
"Response: "
)
pandas_prompt = PromptTemplate(pandas_prompt_str).partial_format(
instruction_str=instruction_str, df_str=df.head(5)
)
pandas_output_parser = PandasInstructionParser(df)
response_synthesis_prompt = PromptTemplate(response_synthesis_prompt_str)
qp = QP(
modules={
"input": InputComponent(),
"pandas_prompt": pandas_prompt,
"llm1": llm,
"pandas_output_parser": pandas_output_parser,
"response_synthesis_prompt": response_synthesis_prompt,
"llm2": llm,
},
verbose=True,
)
qp.add_chain(["input", "pandas_prompt", "llm1", "pandas_output_parser"])
qp.add_links(
[
Link("input", "response_synthesis_prompt", dest_key="query_str"),
Link(
"llm1", "response_synthesis_prompt", dest_key="pandas_instructions"
),
Link(
"pandas_output_parser",
"response_synthesis_prompt",
dest_key="pandas_output",
),
]
)
qp.add_link("response_synthesis_prompt", "llm2")
return qp, "Query pipeline created successfully!"
def enhance_visualization(df, query):
"""
Create an enhanced visualization based on the dataframe and query
This function attempts to create a better visualization with proper labels and formatting
"""
try:
# Close any existing figures to avoid conflicts
plt.close('all')
# Create a new figure with larger size for better quality
plt.figure(figsize=(12, 8), dpi=100)
# Time-related visualization handling (for bookings over time, trends, etc.)
if any(term in query.lower() for term in ['trend', 'time', 'year', 'month', 'booking', 'reservation']):
# Try to detect date columns
date_cols = [col for col in df.columns if any(term in col.lower() for term in
['date', 'year', 'month', 'time', 'arrival', 'reservation'])]
if 'arrival_date_year' in df.columns and 'arrival_date_month' in df.columns:
try:
# Create a year-month based visualization
# Convert month names to numbers for sorting
month_order = {
'January': 1, 'February': 2, 'March': 3, 'April': 4, 'May': 5, 'June': 6,
'July': 7, 'August': 8, 'September': 9, 'October': 10, 'November': 11, 'December': 12
}
# Count bookings by year and month
booking_counts = df.groupby(['arrival_date_year', 'arrival_date_month']).size().reset_index(name='count')
# Add month order for sorting
booking_counts['month_order'] = booking_counts['arrival_date_month'].map(month_order)
booking_counts = booking_counts.sort_values(['arrival_date_year', 'month_order'])
# Create pivot table for visualization
pivot_data = booking_counts.pivot(index='arrival_date_year', columns='arrival_date_month', values='count')
# Reorder columns by month
months = sorted(booking_counts['arrival_date_month'].unique(), key=lambda x: month_order.get(x, 13))
if len(months) > 0: # Check if the months list is not empty
pivot_data = pivot_data[months]
# Plot the data
ax = pivot_data.plot(kind='bar', figsize=(14, 8), width=0.8)
# Enhance the plot
plt.title('Bookings by Month and Year', fontsize=16)
plt.xlabel('Year', fontsize=14)
plt.ylabel('Number of Bookings', fontsize=14)
plt.legend(title='Month', fontsize=12)
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.tight_layout()
# Add value labels on top of bars
for container in ax.containers:
ax.bar_label(container, fontsize=9, fmt='%d')
else:
return None # No months data found
except Exception as e:
print(f"Error in time visualization: {str(e)}")
return None
elif len(date_cols) > 0 and any(col in df.columns for col in date_cols):
try:
# Handle other time-based visualizations
date_col = [col for col in date_cols if col in df.columns][0]
df_count = df.groupby(date_col).size().reset_index(name='count')
plt.bar(df_count[date_col], df_count['count'], color='steelblue')
plt.title(f'Distribution by {date_col}', fontsize=16)
plt.xlabel(date_col, fontsize=14)
plt.ylabel('Count', fontsize=14)
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.xticks(rotation=45)
plt.tight_layout()
except Exception as e:
print(f"Error in date column visualization: {str(e)}")
return None
else:
# Default time visualization if we can't find specific columns
return None # Let matplotlib handle it
# Distribution visualization (for questions about distributions)
elif any(term in query.lower() for term in ['distribution', 'histogram', 'spread']):
try:
numeric_cols = df.select_dtypes(include=['number']).columns.tolist()
if len(numeric_cols) > 0:
# Choose a relevant column based on query or the first numeric column
target_col = None
for col in numeric_cols:
if col.lower() in query.lower():
target_col = col
break
if target_col is None and numeric_cols:
target_col = numeric_cols[0]
if target_col:
# Create histogram
plt.hist(df[target_col].dropna(), bins=30, color='steelblue', edgecolor='black', alpha=0.7)
plt.title(f'Distribution of {target_col}', fontsize=16)
plt.xlabel(target_col, fontsize=14)
plt.ylabel('Frequency', fontsize=14)
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.tight_layout()
else:
return None # Let matplotlib handle it
else:
return None # Let matplotlib handle it
except Exception as e:
print(f"Error in distribution visualization: {str(e)}")
return None
# Comparison visualization (for questions comparing categories)
elif any(term in query.lower() for term in ['compare', 'comparison', 'versus', 'vs', 'most', 'least']):
try:
categorical_cols = df.select_dtypes(include=['object']).columns.tolist()
if len(categorical_cols) > 0:
# Choose a relevant column based on query or the first categorical column
target_col = None
for col in categorical_cols:
if col.lower() in query.lower():
target_col = col
break
if target_col is None and categorical_cols:
target_col = categorical_cols[0]
if target_col:
# Get top categories by count
top_categories = df[target_col].value_counts().nlargest(10)
# Create bar chart
plt.bar(top_categories.index, top_categories.values, color='steelblue')
plt.title(f'Top Categories by {target_col}', fontsize=16)
plt.xlabel(target_col, fontsize=14)
plt.ylabel('Count', fontsize=14)
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
else:
return None # Let matplotlib handle it
else:
return None # Let matplotlib handle it
except Exception as e:
print(f"Error in comparison visualization: {str(e)}")
return None
else:
# For other types of queries, let the default matplotlib handle it
return None
# Save figure to buffer
buf = io.BytesIO()
plt.savefig(buf, format='png')
buf.seek(0)
# Create an image from the buffer
img = Image.open(buf)
plt.close('all') # Close the figure to free memory
return img
except Exception as e:
print(f"Error in enhance_visualization: {str(e)}")
plt.close('all') # Make sure to close any figures in case of error
return None
def process_query(query, api_key, df, model_choice):
if df is None:
return "Please load a dataset first.", None
if not api_key:
return "Please provide your Groq API key.", None
try:
# First, try to create an enhanced visualization based on the query
enhanced_img = enhance_visualization(df, query)
# Create and run the query pipeline
pipeline, message = create_query_pipeline(df, api_key, model_choice)
if pipeline is None:
return message, None
# Run the query
response = pipeline.run(query_str=query)
# If we already have an enhanced visualization, use it
if enhanced_img is not None:
return response.message.content, enhanced_img
# Otherwise check if any matplotlib figures were created by the query
figures = plt.get_fignums()
if figures:
try:
# Improve any existing figure if possible
fig = plt.figure(figures[0])
axes = fig.axes
if axes and len(axes) > 0: # Make sure axes list isn't empty
ax = axes[0]
# Add grid lines
ax.grid(axis='y', linestyle='--', alpha=0.7)
# Enhance title and labels if they exist
if ax.get_title():
ax.set_title(ax.get_title(), fontsize=16)
if ax.get_xlabel():
ax.set_xlabel(ax.get_xlabel(), fontsize=14)
if ax.get_ylabel():
ax.set_ylabel(ax.get_ylabel(), fontsize=14)
# Handle legend if it exists
if ax.get_legend():
ax.legend(fontsize=12)
fig.tight_layout()
# Save the figure to a bytes buffer
buf = io.BytesIO()
plt.savefig(buf, format='png', dpi=100)
buf.seek(0)
# Create an image from the buffer
img = Image.open(buf)
plt.close('all') # Close the figure to free memory
return response.message.content, img
except Exception as e:
plt.close('all')
# Log the error but continue without crashing
print(f"Visualization error: {str(e)}")
return response.message.content, None
else:
# No visualization was generated
return response.message.content, None
except Exception as e:
plt.close('all') # Make sure to close any figures in case of error
return f"Error processing query: {str(e)}", None
def handle_example_selection(example_name):
if example_name in EXAMPLE_DATASETS:
file_path = EXAMPLE_DATASETS[example_name]
df, message = load_dataframe(file_path)
return df, message, gr.update(value=f"Dataset preview:\n{df.head().to_string()}")
return None, "Please select a valid example dataset.", gr.update(value="")
def handle_file_upload(file):
if file is not None:
df, message = load_dataframe(file)
return df, message, gr.update(value=f"Dataset preview:\n{df.head().to_string()}")
return None, "No file uploaded.", gr.update(value="")
# Create Gradio interface
with gr.Blocks(title="Pandas Data Analysis with Groq LLM") as app:
gr.Markdown("# Pandas Data Analysis with Groq LLM")
gr.Markdown("Upload your CSV data or choose an example dataset, then ask questions about it.")
# State variables
df_state = gr.State(value=None)
with gr.Row():
with gr.Column(scale=1):
with gr.Group():
gr.Markdown("### Data Selection")
with gr.Tab("Upload Data"):
file_input = gr.File(label="Upload CSV File", file_types=[".csv"])
upload_button = gr.Button("Load Uploaded Data")
with gr.Tab("Example Datasets"):
example_dropdown = gr.Dropdown(
choices=list(EXAMPLE_DATASETS.keys()),
label="Select Example Dataset"
)
example_button = gr.Button("Load Example Dataset")
data_status = gr.Textbox(label="Data Loading Status", interactive=False)
with gr.Group():
gr.Markdown("### Groq API Configuration")
api_key = gr.Textbox(
label="Enter your Groq API Key",
placeholder="gsk_...",
type="password"
)
model_choice = gr.Dropdown(
choices=["llama-3.3-70b-versatile", "mixtral-8x7b-32768", "gemma-7b-it"],
value="llama-3.3-70b-versatile",
label="Select Groq Model"
)
with gr.Column(scale=1):
data_preview = gr.Textbox(label="Dataset Preview", interactive=False, lines=10)
query_input = gr.Textbox(
label="Ask a question about your data",
placeholder="e.g., What is the trend of monthly bookings over time?",
lines=2
)
query_button = gr.Button("Submit Query")
# Output display with tabs for text and visualization
with gr.Tabs():
with gr.TabItem("Text Response"):
response_output = gr.Textbox(label="Response", interactive=False, lines=10)
with gr.TabItem("Visualization"):
image_output = gr.Image(label="Data Visualization", interactive=False)
# Handle events
upload_button.click(
handle_file_upload,
inputs=[file_input],
outputs=[df_state, data_status, data_preview]
)
example_button.click(
handle_example_selection,
inputs=[example_dropdown],
outputs=[df_state, data_status, data_preview]
)
query_button.click(
process_query,
inputs=[query_input, api_key, df_state, model_choice],
outputs=[response_output, image_output]
)
gr.Markdown("""
### Instructions
1. Upload your CSV file or select an example dataset
2. Enter your Groq API key (get one at [https://console.groq.com](https://console.groq.com))
3. Ask questions about your data in natural language
4. Get AI-powered insights and visualizations based on your data
### Example Questions
- What is the trend of monthly bookings over time?
- What's the distribution of stay duration?
- Which country has the most bookings?
- Is there a correlation between lead time and cancellations?
- Show me bookings by month and year
""")
# Launch the app
if __name__ == "__main__":
app.launch()