# title: Pipe function to connect DeepSeek model created in Azure AI Foundry in Open WebUI
# author: snowpeak
# author_url: https://github.com/xfsnow
# version: 0.1.1
# Reference: https://gist.github.com/sht2017/fa69af95941516dee947d53228bf6afc
import base64
import json
from typing import Union, Generator, Iterator
from pydantic import BaseModel
import requests
import os
class Pipe:
class Valves(BaseModel):
AZURE_ENDPOINT: str = os.getenv("AZURE_API_ENDPOINT", "Azure AI model inference endpoint")
AZURE_API_KEY: str = os.getenv("AZURE_API_KEY", "Secret Key")
AZURE_MODEL_NAME: str = os.getenv("AZURE_MODEL_NAME", "Model name")
def __init__(self):
self.name = "Azure AI Foundry Pipe"
self.valves = self.Valves()
def pipe(self, body: dict, __user__: dict) -> Union[str, Generator, Iterator]:
url = f"{self.valves.AZURE_ENDPOINT}/chat/completions"
headers = {
"api-key": self.valves.AZURE_API_KEY,
"Content-Type": "application/json",
"x-ms-model-mesh-model-name": self.valves.AZURE_MODEL_NAME,
}
allowed_params = {
"messages",
"temperature",
"role",
"content",
"contentPart",
"contentPartImage",
"enhancements",
"dataSources",
"n",
"stream",
"stop",
"max_tokens",
"presence_penalty",
"frequency_penalty",
"logit_bias",
"user",
"function_call",
"funcions",
"tools",
"tool_choice",
"top_p",
"log_probs",
"top_logprobs",
"response_format",
"seed",
}
if "user" in body and not isinstance(body["user"], str):
body["user"] = (
body["user"].get("id")
if isinstance(body["user"], dict) and "id" in body["user"]
else str(body["user"])
)
filtered_body = {k: v for k, v in body.items() if k in allowed_params}
if body.get("messages", False):
messages = json.dumps(body["messages"], ensure_ascii=False)
if "" in messages and "" in messages:
if "JSON format" in messages:
return {"tags": []}
else:
chat_history = messages.split("", 1)[1].split(
"", 1
)[0]
user, assistant = chat_history.strip("\\n").split("\\n", 1)[:2]
return (
user.split("USER:", 1)[1][:3]
+ ":"
+ assistant.split("ASSISTANT:", 1)[1][:3]
+ "-"
+ base64.b64encode(chat_history.encode("utf-8")).decode("utf-8")[:10]
)
try:
r = requests.post(url=url, json=filtered_body, headers=headers, stream=True, timeout=30)
r.raise_for_status()
if body.get("stream", False):
return r.iter_lines()
else:
return r.json()
except Exception as e:
print("Error calling Azure Chat Completions API")
response_text = r.text if "r" in locals() and r is not None else ""
return f"Error: {e} ({response_text})"