Simply as that

import requests
import json
import uuid
import time
from flask import Flask, request, Response, stream_with_context, jsonify
from random_user_agent.user_agent import UserAgent
from random_user_agent.params import SoftwareName, OperatingSystem
from time import time as current_time
import os
from functools import wraps
from dotenv import load_dotenv

load_dotenv()

app = Flask(__name__)

completed_prompts = 0
active_streams = 0
total_duration = 0.0

software_names = [SoftwareName.CHROME.value]
operating_systems = [OperatingSystem.WINDOWS.value, OperatingSystem.LINUX.value]
user_agent_rotator = UserAgent(software_names=software_names, operating_systems=operating_systems)
user_agent = f"{user_agent_rotator.get_random_user_agent()} VSCode/1.96.4"

PROXY_PASSWORD = os.getenv('PROXY_PASSWORD')
key = os.getenv('ACCESS_TOKEN')
API_ENDPOINT = "https://codestory-provider-dot-anton-390822.ue.r.appspot.com/openrouter-api"

def get_external_url_for_huggingface_space(space_id: str) -> str:
    try:
        username, spacename = space_id.split("/")
        return f"https://{username}-{spacename.replace('_', '-')}.hf.space"
    except Exception as e:
        print(f"Error generating Hugging Face Space URL: {e}")
        return ""

def get_base_url(request) -> str:
    space_id = os.getenv('SPACE_ID')
    if space_id and "hf.space" not in request.host:
        return get_external_url_for_huggingface_space(space_id)
    return request.base_url

def require_password(view_func):
    @wraps(view_func)
    def wrapper(*args, **kwargs):
        auth_header = request.headers.get("Authorization")
        if auth_header and auth_header.startswith("Bearer "):
            token = auth_header.split(" ")[1]
            if token == PROXY_PASSWORD:
                return view_func(*args, **kwargs)
        api_key = request.headers.get("x-api-key")
        if api_key and api_key == PROXY_PASSWORD:
            return view_func(*args, **kwargs)
        return jsonify({"error": "Unauthorized"}), 401
    return wrapper

def chat_request(messages, temp, system, model=None):
    if not system:
        system = [{"type": "text", "text": "You are a helpful assistant that follows all user instructions."}]

    # anthropic/claude-3-5-sonnet:beta doesn't work
    if model == "anthropic/claude-3-5-sonnet:beta":
        model = "claude-3-5-sonnet-20241022"
    else:
        model = model or "claude-3-5-sonnet-20241022"

    payload = {
        "model": model,
        "temperature": temp,
        "stream": True,
        "messages": [
            {"role": "system", "content": system},
            messages
        ]
    }
    resp = requests.post(
        API_ENDPOINT,
        headers={
            "authorization": f"Bearer {key}",
            "content-type": "application/json",
            "User-Agent": user_agent
        },
        json=payload, stream=True
    )
    return resp if resp.ok else None

@app.route("/", methods=["GET"])
def root():
    global completed_prompts, active_streams, total_duration
    average_duration = total_duration / completed_prompts if completed_prompts > 0 else 0
    base_url = get_base_url(request).rstrip('/')
    if base_url.startswith("http://"):
        base_url = base_url.replace("http://", "https://", 1)
    response_data = {
        "Total Requests": completed_prompts,
        "Active Requests": active_streams,
        "Average Duration": average_duration,
        "Proxy Endpoint": base_url
    }
    pretty_json = json.dumps(response_data, indent=4, sort_keys=False)
    html_content = f"""
    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <meta name="viewport" content="width=device-width, initial-scale=1.0">
        <title>Proxy Status</title>
        <style>
            body {{
                font-family: Arial, sans-serif;
                margin: 20px;
                background-color: #f4f4f9;
                color: #333;
                display: flex;
                flex-direction: column;
                align-items: center;
            }}
            h1 {{
                color: #444;
            }}
            pre {{
                background-color: #fff;
                padding: 15px;
                border-radius: 5px;
                border: 1px solid #ddd;
                max-width: 600px;
                overflow-x: auto;
                width: 100%;
                box-sizing: border-box;
            }}
        </style>
    </head>
    <body>
        <pre>{pretty_json}</pre>
    </body>
    </html>
    """

    return Response(html_content, content_type='text/html')

@app.route("/chat/completions", methods=["POST"])
@app.route("/v1/chat/completions", methods=["POST"])
@require_password
def handle_openai_chat():
    global completed_prompts, active_streams, total_duration
    data = request.json
    streaming = data.get("stream", True)
    start_time = current_time()

    result = chat_request(
        messages=data.get("messages"),
        temp=data.get("temperature"),
        system=data.get("system"),
        model=data.get("model")
    )

    if not result:
        return {"error": "Request failed"}

    if streaming:
        active_streams += 1

        def generate():
            nonlocal start_time
            global active_streams, completed_prompts, total_duration
            try:
                for l in result.iter_lines():
                    if not l:
                        continue
                    try:
                        d = json.loads(l.decode('utf-8').replace('data: ', ''))
                        if 'choices' in d and len(d['choices']) > 0:
                            chunk = d['choices'][0].get('delta', {}).get('content', '')
                            if chunk:
                                yield f"data: {json.dumps({'choices': [{'delta': {'content': chunk}}]})}\n\n"
                            if d.get('choices', [{}])[0].get('finish_reason') is not None:
                                yield f"data: {json.dumps({'choices': [{'finish_reason': 'stop'}]})}\n\n"
                                break
                    except json.JSONDecodeError as e:
                        print(f"JSON decode error: {e}")
                        continue
                    except GeneratorExit:
                        print("Generator closed prematurely")
                    except Exception as e:
                        print(f"Error in generator: {e}")
            finally:
                active_streams -= 1
                duration = current_time() - start_time
                total_duration += duration
                completed_prompts += 1
                print("Generator cleanup complete")

        return Response(stream_with_context(generate()), content_type='text/event-stream', headers={'Cache-Control': 'no-cache', 'Connection': 'keep-alive'})
    else:
        txt = ""
        for l in result.iter_lines():
            if not l:
                continue
            try:
                d = json.loads(l.decode('utf-8').replace('data: ', ''))
                if 'choices' in d and len(d['choices']) > 0:
                    chunk = d['choices'][0].get('delta', {}).get('content', '')
                    if chunk:
                        txt += chunk
                    if d.get('choices', [{}])[0].get('finish_reason') is not None:
                        break
            except Exception as e:
                continue
        duration = current_time() - start_time
        total_duration += duration
        completed_prompts += 1
        return {"type": "message", "content": [{"type": "text", "text": txt}]}

@app.route("/messages", methods=["POST"])
@app.route("/v1/messages", methods=["POST"])
@require_password
def handle_anthropic_chat():
    global completed_prompts, active_streams, total_duration
    data = request.json
    streaming = data.get("stream", True)
    start_time = current_time()

    result = chat_request(
        messages=data.get("messages"),
        temp=data.get("temperature"),
        system=data.get("system"),
        model=data.get("model")
    )

    if not result:
        return {"error": "Request failed"}

    if streaming:
        active_streams += 1

        def generate():
            nonlocal start_time
            global active_streams, completed_prompts, total_duration
            try:
                yield f"event: message_start\ndata: {json.dumps({'type': 'message_start', 'message': {'id': str(uuid.uuid4()), 'type': 'message', 'role': 'assistant', 'content': [], 'model': data.get('model'), 'stop_reason': None, 'stop_sequence': None, 'usage': {'input_tokens': 0, 'output_tokens': 0}}})}\n\n"

                for l in result.iter_lines():
                    if not l:
                        continue
                    try:
                        d = json.loads(l.decode('utf-8').replace('data: ', ''))
                        if 'choices' in d and len(d['choices']) > 0:
                            chunk = d['choices'][0].get('delta', {}).get('content', '')
                            if chunk:
                                yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': chunk}})}\n\n"
                            if d.get('choices', [{}])[0].get('finish_reason') is not None:
                                yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': 0})}\n\n"
                                yield f"event: message_delta\ndata: {json.dumps({'type': 'message_delta', 'delta': {'stop_reason': 'end_turn', 'stop_sequence': None}, 'usage': {'output_tokens': 0}})}\n\n"
                                yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n"
                                break
                    except json.JSONDecodeError as e:
                        print(f"JSON decode error: {e}")
                        continue
                    except GeneratorExit:
                        print("Generator closed prematurely")
                    except Exception as e:
                        print(f"Error in generator: {e}")
            finally:
                active_streams -= 1
                duration = current_time() - start_time
                total_duration += duration
                completed_prompts += 1
                print("Generator cleanup complete")

        return Response(stream_with_context(generate()), content_type='text/event-stream', headers={'Cache-Control': 'no-cache', 'Connection': 'keep-alive'})
    else:
        txt = ""
        for l in result.iter_lines():
            if not l:
                continue
            try:
                d = json.loads(l.decode('utf-8').replace('data: ', ''))
                if 'choices' in d and len(d['choices']) > 0:
                    chunk = d['choices'][0].get('delta', {}).get('content', '')
                    if chunk:
                        txt += chunk
                    if d.get('choices', [{}])[0].get('finish_reason') is not None:
                        break
            except Exception as e:
                continue
        duration = current_time() - start_time
        total_duration += duration
        completed_prompts += 1
        return {
            "content": [{"text": txt, "type": "text"}],
            "id": str(uuid.uuid4()),
            "model": data.get("model"),
            "role": "assistant",
            "stop_reason": "end_turn",
            "stop_sequence": None,
            "type": "message",
            "usage": {
                "input_tokens": 0,
                "output_tokens": len(txt.split())
            }
        }

@app.route("/models", methods=["GET"])
def list_models():
    try:
        response = requests.get("https://openrouter.ai/api/v1/models", headers={"User-Agent": user_agent})
        if response.ok:
            return response.json()
        else:
            return {"error": "Failed to fetch models"}, response.status_code
    except requests.RequestException as e:
        return {"error": str(e)}, 500

if __name__ == "__main__":
    app.run(port=7860)
Edit Report
Pub: 16 Feb 2025 13:39 UTC
Views: 943