import os import shutil import uuid import logging import json from flask import Flask, redirect, url_for, request, flash, session, after_this_request from flask import render_template from flask import send_file logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) UPLOAD_FOLDER = './images' OUT_FOLDER = './anno' app = Flask(__name__) app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 0 app.config["SECRET_KEY"] = '#thaistartsfirst!' app.config["IMAGES"] = UPLOAD_FOLDER app.config["OUT"] = OUT_FOLDER app.config["LABELS"] = [] app.config["CATEGORIES"] = {} # [] app.config["HEAD"] = 0 app.config["SESSION_PERMANENT"] = False # Ensure the upload directory exists #if not os.path.exists(os.path.join(os.getcwd(), UPLOAD_FOLDER)): if not os.path.exists(UPLOAD_FOLDER): os.makedirs(UPLOAD_FOLDER) #if not os.path.exists(os.path.join(os.getcwd(), OUT_FOLDER)): if not os.path.exists(OUT_FOLDER): os.makedirs(OUT_FOLDER) def get_anno_path(user_id): return os.path.join(OUT_FOLDER, f'{user_id}.csv') def get_images_directory(user_id): return os.path.join(UPLOAD_FOLDER, f'{user_id}') @app.route('/', methods=['GET', 'POST']) def index(): user_id = session.get('_id') # user_id = request.cookies.get('session') if user_id is None: user_id = uuid.uuid4() session['_id'] = user_id logger.info(user_id) anno_path = get_anno_path(user_id) with open(anno_path, 'w') as f: f.write("image,category,name,xMin,yMin,xMax,yMax\n") if request.method == 'POST': if 'file' not in request.files: flash('No files selected') return redirect('/') img_dir = get_images_directory(user_id) try: os.makedirs(img_dir) except FileExistsError: logger.info('user already has an active session') files = request.files.getlist("file") filenames = [] for f in files: f.save(os.path.join(img_dir, f.filename)) filenames.append(f.filename) app.config["FILES"] = filenames # logger.info(app.config["FILES"], app.config["HEAD"]) return redirect('/tagger', code=302) else: return render_template('index.html') @app.route('/tagger') def tagger(): if (app.config["HEAD"] == len(app.config["FILES"])): # done annotating the batch of images app.config["HEAD"] = 0 return redirect(url_for('final')) user_id = session.get('_id') # app.config["IMAGES"] + f'/{user_id}' img_dir = get_images_directory(user_id) image = app.config["FILES"][app.config["HEAD"]] labels = app.config["LABELS"] not_end = not (app.config["HEAD"] == len(app.config["FILES"]) - 1) return render_template('tagger.html', not_end=not_end, directory=img_dir, image=image, labels=labels, head=app.config["HEAD"] + 1, len=len(app.config["FILES"])) @app.route('/next') def next(): image = app.config["FILES"][app.config["HEAD"]] app.config["HEAD"] = app.config["HEAD"] + 1 user_id = session.get("_id") anno_path = get_anno_path(user_id) with open(anno_path, 'a+') as f: for label in app.config["LABELS"]: f.write(image + "," + str(label["category"]) + "," + label["name"] + "," + str(round(float(label["xMin"]))) + "," + str(round(float(label["yMin"]))) + "," + str(round(float(label["xMax"]))) + "," + str(round(float(label["yMax"]))) + "\n") app.config["LABELS"] = [] return redirect(url_for('tagger')) @app.route("/final") def final(): return render_template('final.html') @app.route('/add/') def add(id): xMin = request.args.get("xMin") xMax = request.args.get("xMax") yMin = request.args.get("yMin") yMax = request.args.get("yMax") app.config["LABELS"].append( {"id": id, "name": "", "category": "", "xMin": xMin, "xMax": xMax, "yMin": yMin, "yMax": yMax}) return redirect(url_for('tagger')) @app.route('/remove/') def remove(id): index = int(id) - 1 del app.config["LABELS"][index] for label in app.config["LABELS"][index:]: label["id"] = str(int(label["id"]) - 1) return redirect(url_for('tagger')) @app.route('/label/') def label(id): name = request.args.get("name").lower() app.config["LABELS"][int(id) - 1]["name"] = name category = app.config["CATEGORIES"].get(name, None) if category is None: # add to category category = len(app.config["CATEGORIES"]) app.config["CATEGORIES"][name] = category app.config["LABELS"][int(id) - 1]["category"] = category return redirect(url_for('tagger')) @app.route('/image/') def images(f): user_id = session.get('_id') img_dir = get_images_directory(user_id) img_path = os.path.join(img_dir, f'{f}') return send_file(img_path) @app.route('/download') def download(): user_id = session.get('_id') anno_path = get_anno_path(user_id) img_dir = get_images_directory(user_id) data = {} with open(anno_path, 'r') as f: for line in f.readlines(): elems = line.strip().split(',') file_name = data.get(elems[0], None) if file_name is None: data[elems[0]] = {'bbox': [], 'categories': [], 'names': []} data[elems[0]]['categories'].append(int(elems[1])) data[elems[0]]['names'].append(elems[2]) data[elems[0]]['bbox'].append([int(elems[i]) for i in range(3, len(elems))]) img_dataset = [] for file_name, obj in data.items(): img_dataset.append( { 'file_name': file_name, 'objects': obj } ) with open(f'{img_dir}/metadata.jsonl', 'w') as f: for item in img_dataset: f.write(json.dumps(item) + "\n") #shutil.copyfile(anno_path, f'{img_dir}/annotations_pascal_voc.csv') download_zip = os.path.join(img_dir, 'final') print(download_zip) shutil.make_archive('final', 'zip', img_dir) # shutil.make_archive(base_name=download_zip, # format='zip', # base_dir=img_dir) @after_this_request def remove_directory(response): try: shutil.rmtree(img_dir) os.remove(anno_path) except Exception as e: print(f"Error deleting directory {img_dir}: {e}") return response return send_file('final.zip', mimetype='text/csv', download_name='annotated_data.zip', as_attachment=True) if __name__ == "__main__": app.run(debug="True")