Spaces:
Running
on
Zero
Running
on
Zero
File size: 6,102 Bytes
cd0b70a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# Created by Fabio Sarracino
import os
import logging
import hashlib
import folder_paths
# Setup logging
logger = logging.getLogger("VibeVoice")
class LoadTextFromFileNode:
@classmethod
def INPUT_TYPES(cls):
# Get all text files from all directories
all_files = []
# Add files from each directory with prefix
for dir_name in ["input", "output", "temp"]:
files = cls.get_files_for_directory(dir_name)
for f in files:
if f != "No text files found":
all_files.append(f"{dir_name}/{f}")
if not all_files:
all_files = ["No text files found in any directory"]
return {
"required": {
"file": (sorted(all_files), {
"tooltip": "Select a text file to load (format: directory/filename)"
}),
}
}
@classmethod
def get_files_for_directory(cls, source_dir):
"""Get list of text files for the selected directory"""
# Get the appropriate directory path
if source_dir == "input":
dir_path = folder_paths.get_input_directory()
elif source_dir == "output":
dir_path = folder_paths.get_output_directory()
elif source_dir == "temp":
dir_path = folder_paths.get_temp_directory()
else:
return []
files = []
try:
for f in os.listdir(dir_path):
if os.path.isfile(os.path.join(dir_path, f)):
# Check for text file extensions
if f.lower().endswith(('.txt')):
files.append(f)
except Exception as e:
logger.warning(f"Error listing files in {source_dir}: {e}")
return files
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("text",)
FUNCTION = "load_text"
CATEGORY = "VibeVoiceWrapper"
DESCRIPTION = "Load text content from a .txt file"
def load_text(self, file: str):
"""Load text content from file"""
try:
# Check if no file selected
if not file or file == "No text files found in any directory":
raise Exception("Please select a valid text file.")
# Parse directory and filename from the combined string
if "/" not in file:
raise Exception(f"Invalid file format: {file}")
source_dir, filename = file.split("/", 1)
# Get the appropriate directory path
if source_dir == "input":
dir_path = folder_paths.get_input_directory()
elif source_dir == "output":
dir_path = folder_paths.get_output_directory()
elif source_dir == "temp":
dir_path = folder_paths.get_temp_directory()
else:
raise Exception(f"Invalid source directory: {source_dir}")
# Build full file path
file_path = os.path.join(dir_path, filename)
if not os.path.exists(file_path):
raise Exception(f"File not found: {file_path}")
# Read file with UTF-8 encoding (most common)
with open(file_path, 'r', encoding='utf-8') as f:
text_content = f.read()
if not text_content.strip():
raise Exception("File is empty or contains only whitespace")
return (text_content,)
except UnicodeDecodeError as e:
raise Exception(f"Encoding error reading file: {str(e)}. File may not be UTF-8 encoded.")
except Exception as e:
logger.error(f"Failed to load text file: {str(e)}")
raise Exception(f"Error loading text file: {str(e)}")
@classmethod
def IS_CHANGED(cls, file):
"""Cache key for ComfyUI"""
if not file or file == "No text files found in any directory":
return "no_file"
# Parse directory and filename
if "/" not in file:
return f"{file}_invalid"
source_dir, filename = file.split("/", 1)
# Get the appropriate directory path
if source_dir == "input":
dir_path = folder_paths.get_input_directory()
elif source_dir == "output":
dir_path = folder_paths.get_output_directory()
elif source_dir == "temp":
dir_path = folder_paths.get_temp_directory()
else:
return f"{file}_invalid_dir"
file_path = os.path.join(dir_path, filename)
if not os.path.exists(file_path):
return f"{file}_not_found"
# Use file hash for cache invalidation
try:
m = hashlib.sha256()
with open(file_path, 'rb') as f:
m.update(f.read())
return m.digest().hex()
except:
return f"{file}_error"
@classmethod
def VALIDATE_INPUTS(cls, file, **kwargs):
"""Validate that the file exists"""
if not file or file == "No text files found in any directory":
return "No valid text file selected"
# Parse directory and filename
if "/" not in file:
return f"Invalid file format: {file}"
source_dir, filename = file.split("/", 1)
# Get the appropriate directory path
if source_dir == "input":
dir_path = folder_paths.get_input_directory()
elif source_dir == "output":
dir_path = folder_paths.get_output_directory()
elif source_dir == "temp":
dir_path = folder_paths.get_temp_directory()
else:
return f"Invalid source directory: {source_dir}"
file_path = os.path.join(dir_path, filename)
if not os.path.exists(file_path):
return f"File not found: {filename} in {source_dir}"
return True |