Spaces:
Running
Running
# Add warning suppression at the very beginning before any other imports | |
import warnings | |
warnings.filterwarnings("ignore", message="No secrets files found.*") | |
import streamlit as st | |
import pandas as pd | |
import time | |
import os | |
import json | |
from datetime import datetime | |
from dotenv import load_dotenv | |
from enhanced_scraper import EnhancedRedditScraper | |
# Disable static file serving to prevent the warning | |
os.environ['STREAMLIT_SERVER_ENABLE_STATIC_SERVING'] = 'false' | |
# Note: Page configuration and session state initialization are handled in app.py | |
# Functions | |
def initialize_scraper(client_id, client_secret, user_agent): | |
"""Initialize the scraper with API credentials""" | |
try: | |
scraper = EnhancedRedditScraper( | |
client_id=client_id, | |
client_secret=client_secret, | |
user_agent=user_agent | |
) | |
st.session_state.scraper = scraper | |
return True | |
except Exception as e: | |
st.error(f"Failed to initialize scraper: {str(e)}") | |
return False | |
def run_search(subreddits, keywords, limit, sort_by, include_comments, | |
include_selftext, min_score): | |
"""Run the search with provided parameters""" | |
if not st.session_state.scraper: | |
st.error("Scraper not initialized. Please set up API credentials first.") | |
return False | |
try: | |
with st.spinner("Scraping Reddit..."): | |
if len(subreddits) == 1: | |
# Single subreddit search | |
results = st.session_state.scraper.scrape_subreddit( | |
subreddit_name=subreddits[0], | |
keywords=keywords, | |
limit=limit, | |
sort_by=sort_by, | |
include_comments=include_comments, | |
include_selftext=include_selftext, | |
min_score=min_score | |
) | |
st.session_state.results = {subreddits[0]: results} | |
else: | |
# Multiple subreddit search | |
results = st.session_state.scraper.search_multiple_subreddits( | |
subreddits=subreddits, | |
keywords=keywords, | |
limit=limit, | |
sort_by=sort_by, | |
include_comments=include_comments, | |
include_selftext=include_selftext, | |
min_score=min_score | |
) | |
st.session_state.results = results | |
# Add to search history | |
search_info = { | |
'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
'subreddits': subreddits, | |
'keywords': keywords, | |
'total_results': sum(len(results) for results in st.session_state.results.values()) | |
} | |
st.session_state.search_history.append(search_info) | |
return True | |
except Exception as e: | |
st.error(f"Search failed: {str(e)}") | |
return False | |
def filter_results(results, filters): | |
"""Apply filters to results""" | |
filtered = {} | |
for subreddit, posts in results.items(): | |
filtered_posts = [] | |
for post in posts: | |
# Apply score filter | |
if post['score'] < filters['min_score']: | |
continue | |
# Apply date filters if set | |
if filters['date_from'] or filters['date_to']: | |
post_date = datetime.strptime(post['created_utc'], '%Y-%m-%d %H:%M:%S') | |
if filters['date_from'] and post_date < filters['date_from']: | |
continue | |
if filters['date_to'] and post_date > filters['date_to']: | |
continue | |
# Filter for posts with comments if requested | |
if filters['show_only_with_comments'] and ( | |
'matching_comments' not in post or not post['matching_comments']): | |
continue | |
filtered_posts.append(post) | |
filtered[subreddit] = filtered_posts | |
return filtered | |
# Visualization function has been removed | |
def main(): | |
# Suppress the "No secrets files found" warning | |
warnings.filterwarnings("ignore", message="No secrets files found.*") | |
# Ensure session state variables are initialized | |
if 'results' not in st.session_state: | |
st.session_state['results'] = None | |
if 'scraper' not in st.session_state: | |
st.session_state['scraper'] = None | |
if 'search_history' not in st.session_state: | |
st.session_state['search_history'] = [] | |
if 'filters' not in st.session_state: | |
st.session_state['filters'] = { | |
'min_score': 0, | |
'date_from': None, | |
'date_to': None, | |
'show_only_with_comments': False | |
} | |
# Header using Streamlit's native heading components | |
st.title("Reddit Scraper") | |
st.header("Data Collection Tool") | |
# Sidebar for configuration | |
with st.sidebar: | |
st.header("Configuration") | |
# Search Parameters | |
st.subheader("Search Parameters") | |
# Multiple subreddit input | |
subreddits_input = st.text_area("Subreddits (one per line)", value="cuny\ncollegequestions") | |
subreddits = [s.strip() for s in subreddits_input.split("\n") if s.strip()] | |
# Keywords input | |
keywords_input = st.text_area("Keywords (one per line)", value="question\nhelp\nconfused") | |
keywords = [k.strip() for k in keywords_input.split("\n") if k.strip()] | |
# Other parameters | |
limit = st.slider("Number of posts to scan per subreddit", 10, 200, 50) | |
sort_by = st.selectbox("Sort posts by", ["hot", "new", "top", "rising"], index=0) | |
include_selftext = st.checkbox("Include post content in search", value=True) | |
include_comments = st.checkbox("Include comments in search", value=True) | |
min_score = st.slider("Minimum score (upvotes)", 0, 1000, 0) | |
# Action buttons | |
search_col, clear_col = st.columns(2) | |
with search_col: | |
search_button = st.button("Run Search", type="primary", use_container_width=True) | |
with clear_col: | |
clear_button = st.button("Clear Results", type="secondary", use_container_width=True) | |
# Main interface tabs | |
tab1, tab2, tab3, tab4 = st.tabs(["Results", "Export", "History", "API Credentials"]) | |
# Handle Actions | |
if clear_button: | |
st.session_state.results = None | |
st.rerun() | |
if search_button: | |
if not subreddits: | |
st.error("Please enter at least one subreddit to search.") | |
elif not keywords: | |
st.error("Please enter at least one keyword to search.") | |
else: | |
success = run_search( | |
subreddits=subreddits, | |
keywords=keywords, | |
limit=limit, | |
sort_by=sort_by, | |
include_comments=include_comments, | |
include_selftext=include_selftext, | |
min_score=min_score | |
) | |
if success: | |
st.success(f"Search completed! Found results in {len(st.session_state.results)} subreddits.") | |
# Tab 1: Results | |
with tab1: | |
if st.session_state.results: | |
# Post-search filters | |
st.markdown('<div class="card">', unsafe_allow_html=True) | |
st.subheader("Filter Results") | |
filter_col1, filter_col2, filter_col3 = st.columns(3) | |
with filter_col1: | |
st.session_state.filters['min_score'] = st.number_input( | |
"Minimum score", min_value=0, value=st.session_state.filters['min_score']) | |
with filter_col2: | |
st.session_state.filters['date_from'] = st.date_input( | |
"From date", value=None) | |
with filter_col3: | |
st.session_state.filters['date_to'] = st.date_input( | |
"To date", value=None) | |
st.session_state.filters['show_only_with_comments'] = st.checkbox( | |
"Show only posts with matching comments", | |
value=st.session_state.filters['show_only_with_comments']) | |
apply_filters = st.button("Apply Filters") | |
st.markdown('</div>', unsafe_allow_html=True) | |
# Apply filters if requested | |
if apply_filters: | |
filtered_results = filter_results(st.session_state.results, st.session_state.filters) | |
else: | |
filtered_results = st.session_state.results | |
# Show results for each subreddit | |
total_posts = sum(len(posts) for posts in filtered_results.values()) | |
st.subheader(f"Search Results ({total_posts} posts found)") | |
for subreddit, posts in filtered_results.items(): | |
with st.expander(f"r/{subreddit} - {len(posts)} posts", expanded=len(filtered_results) == 1): | |
if posts: | |
# Create a dataframe for easier viewing | |
df = pd.DataFrame([{ | |
'Title': p['title'], | |
'Score': p['score'], | |
'Comments': p['num_comments'], | |
'Date': p['created_utc'], | |
'URL': p['permalink'] | |
} for p in posts]) | |
st.dataframe(df, use_container_width=True) | |
# Show detailed post view | |
st.subheader("Post Details") | |
# Handle the case where there are no posts or only one post | |
if len(posts) == 0: | |
st.info(f"No posts found to display details.") | |
elif len(posts) == 1: | |
# For a single post, no need for a slider | |
post_index = 0 | |
st.info(f"Displaying the only post found.") | |
else: | |
# For multiple posts, create a slider | |
post_index = st.slider(f"Select post from r/{subreddit} ({len(posts)} posts)", | |
0, len(posts)-1, 0) | |
if len(posts) > 0: | |
post = posts[post_index] | |
# Display post details in a card | |
st.markdown('<div class="card">', unsafe_allow_html=True) | |
st.markdown(f"### {post['title']}") | |
st.markdown(f"**Author:** u/{post['author']} | **Score:** {post['score']} | **Comments:** {post['num_comments']}") | |
st.markdown(f"**Posted on:** {post['created_utc']}") | |
st.markdown(f"**URL:** [{post['url']}]({post['url']})") | |
if post['text']: | |
st.markdown("##### Post Content") | |
with st.container(): | |
show_content = st.checkbox("Show full content", key=f"content_{subreddit}_{post_index}") | |
if show_content: | |
st.text(post['text']) | |
# Show matching comments if available | |
if 'matching_comments' in post and post['matching_comments']: | |
st.markdown(f"##### Matching Comments ({len(post['matching_comments'])})") | |
with st.container(): | |
show_comments = st.checkbox("Show comments", value=True, key=f"comments_{subreddit}_{post_index}") | |
if show_comments: | |
for i, comment in enumerate(post['matching_comments']): | |
st.markdown(f"**u/{comment['author']}** ({comment['score']} points) - {comment['created_utc']}") | |
st.text(comment['body']) | |
if i < len(post['matching_comments']) - 1: | |
st.divider() | |
st.markdown('</div>', unsafe_allow_html=True) | |
else: | |
st.info(f"No posts found in r/{subreddit} matching the current filters.") | |
else: | |
st.info("Configure the search parameters and click 'Run Search' to begin.") | |
# Show help for first-time users | |
with st.expander("Help & Tips"): | |
st.markdown(""" | |
### Quick Start Guide | |
1. Set up your **API credentials** in the API Credentials tab | |
2. Enter **subreddits** to search (one per line) | |
3. Enter **keywords** to filter posts (one per line) | |
4. Adjust settings as needed | |
5. Click **Run Search** | |
### Search Tips | |
- Use specific keywords for targeted results | |
- Search multiple related subreddits for better coverage | |
- Enable comment search to find keywords in discussions | |
- Export data for external analysis | |
""") | |
# Tab 2: Export | |
with tab2: | |
if st.session_state.results: | |
st.subheader("Export Results") | |
# Apply current filters | |
filtered_results = filter_results(st.session_state.results, st.session_state.filters) | |
# Format selection | |
export_format = st.radio("Export format", ["CSV", "JSON"], horizontal=True) | |
# Filename input | |
timestamp = time.strftime("%Y%m%d_%H%M%S") | |
default_filename = f"reddit_scrape_{timestamp}" | |
filename = st.text_input("Filename (without extension)", value=default_filename) | |
# Export button | |
export_clicked = st.button("Export Data", type="primary") | |
if export_clicked: | |
try: | |
# Combine all results into a flat list for export | |
all_results = [] | |
for subreddit, posts in filtered_results.items(): | |
for post in posts: | |
post_copy = post.copy() | |
post_copy['subreddit'] = subreddit | |
all_results.append(post_copy) | |
# Save results based on selected format | |
if export_format == "CSV": | |
# Convert to dataframe and save | |
df = pd.DataFrame(all_results) | |
# Handle nested structures for CSV | |
if 'matching_comments' in df.columns: | |
df['matching_comments'] = df['matching_comments'].apply( | |
lambda x: json.dumps(x) if isinstance(x, list) else '' | |
) | |
csv_file = f"{filename}.csv" | |
df.to_csv(csv_file, index=False) | |
# Create download button | |
with open(csv_file, 'rb') as f: | |
st.download_button( | |
label="Download CSV", | |
data=f, | |
file_name=csv_file, | |
mime="text/csv" | |
) | |
st.success(f"Exported {len(all_results)} posts to {csv_file}") | |
else: # JSON | |
json_file = f"{filename}.json" | |
with open(json_file, 'w') as f: | |
json.dump(all_results, f, indent=2) | |
# Create download button | |
with open(json_file, 'rb') as f: | |
st.download_button( | |
label="Download JSON", | |
data=f, | |
file_name=json_file, | |
mime="application/json" | |
) | |
st.success(f"Exported {len(all_results)} posts to {json_file}") | |
except Exception as e: | |
st.error(f"Export failed: {str(e)}") | |
else: | |
st.info("Run a search to export results.") | |
# Tab 3: History | |
with tab3: | |
st.subheader("Search History") | |
if st.session_state.search_history: | |
for i, search in enumerate(reversed(st.session_state.search_history)): | |
with st.expander(f"Search #{len(st.session_state.search_history)-i}: {search['timestamp']} ({search['total_results']} results)"): | |
st.markdown(f"**Subreddits:** {', '.join(search['subreddits'])}") | |
st.markdown(f"**Keywords:** {', '.join(search['keywords'])}") | |
st.markdown(f"**Results:** {search['total_results']} posts") | |
st.markdown(f"**Time:** {search['timestamp']}") | |
else: | |
st.info("No search history yet.") | |
# Tab 4: API Credentials - Auto-closed by default | |
with tab4: | |
# Initialize session state for credentials if they don't exist | |
if 'client_id' not in st.session_state: | |
st.session_state.client_id = "" | |
if 'client_secret' not in st.session_state: | |
st.session_state.client_secret = "" | |
if 'user_agent' not in st.session_state: | |
st.session_state.user_agent = "RedditScraperApp/1.0" | |
# In development environment, try to load from .env file for convenience | |
# But don't do this in production to avoid credential leakage | |
is_local_dev = not os.environ.get('SPACE_ID') and not os.environ.get('SYSTEM') | |
if is_local_dev: | |
load_dotenv() | |
# Only load from env if session state is empty (first load) | |
if not st.session_state.client_id: | |
st.session_state.client_id = os.environ.get("REDDIT_CLIENT_ID", "") | |
if not st.session_state.client_secret: | |
st.session_state.client_secret = os.environ.get("REDDIT_CLIENT_SECRET", "") | |
if st.session_state.user_agent == "RedditScraperApp/1.0": | |
st.session_state.user_agent = os.environ.get("REDDIT_USER_AGENT", "RedditScraperApp/1.0") | |
# Two columns for instructions and input | |
cred_col1, cred_col2 = st.columns([1, 1]) | |
with cred_col1: | |
st.markdown(""" | |
#### Getting Credentials: | |
1. Go to [Reddit Developer Portal](https://www.reddit.com/prefs/apps) | |
2. Click "Create App" or "Create Another App" | |
3. Fill in details (name, description) | |
4. Select "script" as application type | |
5. Use "http://localhost:8000" as redirect URI | |
6. Click "Create app" | |
7. Note the client ID and secret | |
#### Privacy Note | |
Your credentials are never stored on any servers. For personal copies, | |
you can set them as Space secrets. | |
""") | |
with cred_col2: | |
# Use session state for the input values | |
client_id = st.text_input("Client ID", value=st.session_state.client_id, key="client_id_input") | |
client_secret = st.text_input("Client Secret", value=st.session_state.client_secret, type="password", key="client_secret_input") | |
user_agent = st.text_input("User Agent", value=st.session_state.user_agent, key="user_agent_input") | |
# Update session state when input changes | |
st.session_state.client_id = client_id | |
st.session_state.client_secret = client_secret | |
st.session_state.user_agent = user_agent | |
if st.button("Initialize API Connection", type="primary"): | |
if initialize_scraper(client_id, client_secret, user_agent): | |
st.success("API connection established!") | |
# Set environment variables for the current session | |
os.environ["REDDIT_CLIENT_ID"] = client_id | |
os.environ["REDDIT_CLIENT_SECRET"] = client_secret | |
os.environ["REDDIT_USER_AGENT"] = user_agent | |
if __name__ == "__main__": | |
main() | |