FistalAI / server.py
mahreenfathima's picture
Update server.py
542577d verified
#ุจุณู… ุงู„ู„ู‡ ุงู„ุฑุญู…ู† ุงู„ุฑุญูŠู…
from fastmcp import FastMCP
import asyncio
import json
import os
import time
from langchain_core.prompts import ChatPromptTemplate
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_groq import ChatGroq
from dotenv import load_dotenv
import nltk
import sys
from modal_tool import train_with_modal, app, upload_to_hf_from_volume, evaluate_model
mcp = FastMCP(name="FistalMCP")
groq = os.getenv("GROQ_API_KEY")
hf = os.getenv("HF_TOKEN")
if not groq:
print("GROQ_API_KEY missing!", file=sys.stderr)
if not hf:
print("HF Token not valid", file=sys.stderr)
gk1 = os.environ.get("GOOGLE_API_KEY_1")
gk2 = os.environ.get("GOOGLE_API_KEY_2")
gk3 = os.environ.get("GOOGLE_API_KEY_3")
GOOGLE_API_KEYS = [k for k in [gk1, gk2, gk3] if k]
if not GOOGLE_API_KEYS:
print("No Google API keys found!", file=sys.stderr)
async def genBatch(topic: str, samples_per_batch: int, batch_num: int, api_key: str, task_type: str) -> list:
"""Generate one batch of samples using a single API key"""
if not api_key or api_key == "YOUR_API_KEY":
return []
llm = ChatGoogleGenerativeAI(
model="gemini-2.5-flash",
temperature=0.7,
google_api_key=api_key
)
prompt_template = """
You are an expert dataset generator.
Generate authentic, high-quality data on the topic: {topic} for task type: {task_type} using your knowledge.
Generate exactly {num} concise, varied, and high-quality samples.
Return a JSON list of objects, each with keys: instruction, input, and output.
Do not add extra texts, markdown, or code fences.
RESPONSE:
"""
promptJSON = ChatPromptTemplate.from_template(prompt_template)
chain = promptJSON | llm
try:
user_input = {
"topic": topic,
"num": samples_per_batch,
"task_type": task_type
}
response = await asyncio.to_thread(chain.invoke, user_input)
content = response.content.strip()
if content.startswith("```json"):
content = content[7:]
if content.startswith("```"):
content = content[3:]
if content.endswith("```"):
content = content[:-3]
content = content.strip()
data = json.loads(content)
return data if isinstance(data, list) else [data]
except json.JSONDecodeError as e:
print(f"JSON decode error in batch {batch_num}: {e}")
return []
except Exception as e:
print(f"Error in batch {batch_num}: {e}")
return []
@mcp.tool()
async def generate_json_data(topic: str, task_type: str, num_samples: int = 1000) -> str:
"""
Generate a training dataset with instruction, input, and output fields.
Uses parallel batching for efficiency. Can generate up to 2000 samples.
Args:
topic: The topic or theme for the dataset
num_samples: Number of training examples to generate (recommended: 100-2000)
Returns:
JSON string with status, topic, total_samples, and data array
"""
topic = str(topic).strip() if topic else ""
task_type = str(task_type).strip() if task_type else "text-generation"
try:
num_samples = int(num_samples)
except (ValueError, TypeError):
num_samples = 100
if not topic:
return json.dumps({
"status": "error",
"message": "Topic cannot be empty"
})
if num_samples <= 0 or num_samples > 2000:
num_samples = min(max(50, num_samples), 2000)
valid_keys = [k for k in GOOGLE_API_KEYS if k and k.strip() and k != "YOUR_API_KEY"]
if not valid_keys:
return json.dumps({
"status": "error",
"message": "No valid Google API keys configured"
})
start_time = time.time()
samples_per_batch = 50
total_batches = (num_samples + samples_per_batch - 1) // samples_per_batch
try:
tasks = []
for batch_num in range(total_batches):
api_key = valid_keys[batch_num % len(valid_keys)]
task = genBatch(
topic=topic.strip(),
samples_per_batch=samples_per_batch,
batch_num=batch_num + 1,
api_key=api_key,
task_type=task_type.strip()
)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
all_samples = []
for batch_result in results:
if isinstance(batch_result, Exception):
continue
if isinstance(batch_result, list):
all_samples.extend(batch_result)
all_samples = all_samples[:num_samples]
end_time = time.time()
gen_time = end_time - start_time
return json.dumps({
"status": "success",
"topic": topic,
"task_type": task_type,
"total_samples": len(all_samples),
"requested_samples": num_samples,
"total_batches": total_batches,
"generation_time_seconds": round(gen_time, 1),
"generation_time_minutes": round(gen_time / 60, 2),
"samples_per_second": round(len(all_samples) / gen_time, 2) if gen_time > 0 else 0,
"data": all_samples
})
except Exception as e:
return json.dumps({
"status": "error",
"message": f"Error generating dataset: {str(e)}"
})
@mcp.tool()
async def format_json(raw_data) -> str:
"""
Convert raw dataset to ChatML format for training
Args:
raw_data: List or JSON string of samples with instruction/input/output
Returns:
JSON string with status, num_samples, and formatted data
"""
try:
if isinstance(raw_data, list):
data = raw_data
elif isinstance(raw_data, str):
parsed = json.loads(raw_data)
if isinstance(parsed, dict) and "data" in parsed:
data = parsed["data"]
else:
data = parsed
elif isinstance(raw_data, dict) and "data" in raw_data:
data = raw_data["data"]
else:
return json.dumps({
"status": "error",
"message": f"Unexpected input type: {type(raw_data).__name__}"
})
if not isinstance(data, list):
return json.dumps({
"status": "error",
"message": "Data must be a list of samples"
})
# Convert to ChatML format
converted = []
for item in data:
if not isinstance(item, dict):
continue
if 'instruction' not in item or 'output' not in item:
continue
user_msg = str(item['instruction'])
if item.get('input'):
user_msg += f"\n\n{item['input']}"
converted.append({
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": user_msg},
{"role": "assistant", "content": str(item['output'])}
]
})
if not converted:
return json.dumps({
"status": "error",
"message": "No valid samples to format"
})
return json.dumps({
"status": "success",
"num_samples": len(converted),
"data": converted,
"message": f"โœ… Formatted {len(converted)} samples"
}, ensure_ascii=False)
except Exception as e:
import traceback
return json.dumps({
"status": "error",
"message": f"Formatting failed: {str(e)}",
"traceback": traceback.format_exc()
})
@mcp.tool()
async def finetune_model(formatted_data, model_name: str, topic: str, task_type: str) -> str:
"""
Fine-tune model on Modal GPU
Args:
formatted_data: List or JSON string with formatted training samples
model_name: Base model to fine-tune
Returns:
JSON string with status, repo_id, model_url
"""
model_name = str(model_name).strip()
models = [
"unsloth/Llama-3.2-1B-Instruct-bnb-4bit",
"unsloth/Phi-3-mini-4k-instruct",
"unsloth/Phi-3-medium-4k-instruct",
"unsloth/Llama-3.2-3B-Instruct-bnb-4bit",
"unsloth/Qwen2.5-3B-Instruct-bnb-4bit",
"unsloth/Qwen2.5-1.5B-Instruct-bnb-4bit",
"unsloth/Qwen2.5-0.5B-Instruct-bnb-4bit",
"unsloth/Qwen2.5-Coder-3B-Instruct-bnb-4bit",
"unsloth/gemma-2-2b-it-bnb-4bit",
"unsloth/SmolLM2-1.7B-Instruct-bnb-4bit",
"unsloth/Phi-3.5-mini-instruct-bnb-4bit",
"unsloth/Granite-3.0-2b-instruct-bnb-4bit",
"unsloth/granite-4.0-h-1b-bnb-4bit"
]
if model_name not in models:
return json.dumps({
"status": "error",
"message": f"Model not supported. Choose from: {', '.join(models[:3])}..."
})
try:
if isinstance(formatted_data, list):
training_data = formatted_data
elif isinstance(formatted_data, str):
parsed = json.loads(formatted_data)
if isinstance(parsed, dict) and "data" in parsed:
training_data = parsed["data"]
else:
training_data = parsed
elif isinstance(formatted_data, dict) and "data" in formatted_data:
training_data = formatted_data["data"]
else:
return json.dumps({
"status": "error",
"message": f"Unexpected input type: {type(formatted_data).__name__}"
})
if not isinstance(training_data, list) or not training_data:
return json.dumps({
"status": "error",
"message": "No training samples provided"
})
jsonl_content = "\n".join([json.dumps(s, ensure_ascii=False) for s in training_data])
with app.run():
result = train_with_modal.remote(jsonl_content, model_name)
if result["status"] != "success":
return json.dumps({
"status": "error",
"message": "Training failed"
})
repoTemp = """
Generate a short repository name for an unsloth finetuned model based on {topic} and {task_type}.
Use '_' instead of spaces. Only return the name without quotations.
"""
repoPrompt = ChatPromptTemplate.from_template(repoTemp)
llm = ChatGroq(
model="llama-3.1-8b-instant",
temperature=0.4,
api_key=groq
)
chain = repoPrompt | llm
inp = {
"topic": topic,
"task_type": task_type
}
repoName = await asyncio.to_thread(chain.invoke, inp)
repoName = repoName.content.strip()
with app.run():
hf_result = upload_to_hf_from_volume.remote(
result["volume_path"],
result["timestamp"],
repoName
)
return json.dumps({
"status": "success",
"repo_id": str(hf_result["repo_id"]),
"model_url": str(hf_result["model_url"]),
"model_path": str(hf_result["repo_id"]),
"num_samples": len(training_data),
"message": f"โœ… Model at {hf_result['model_url']}"
})
except Exception as e:
import traceback
return json.dumps({
"status": "error",
"message": f"Training failed: {str(e)}",
"traceback": traceback.format_exc()
})
@mcp.tool()
async def llm_as_judge(repo_id:str, topic: str, task_type: str) -> dict:
"""Use LLM to judge model quality based on topic and task type"""
import evaluate
eval_llm = ChatGroq(
model="llama-3.1-8b-instant",
temperature=0.2,
api_key=groq
)
test_prompt_text = f"""Generate 3 test cases for evaluating a model fine-tuned strictly based on **{topic} for {task_type}**.
Return ONLY a JSON array with this exact format, no other text:
[{{"input": "test question 1", "expected_output": "expected answer 1"}}, {{"input": "test question 2", "expected_output": "expected answer 2"}}, {{"input": "test question 3", "expected_output": "expected answer 3"}}]"""
try:
text_responses = await eval_llm.ainvoke(test_prompt_text)
response = text_responses.content.strip()
response = response.replace("```json", "").replace("```", "").strip()
import re
match = re.search(r'\[.*\]', response, re.DOTALL)
if match:
response = match.group(0)
test_cases = json.loads(response)[:3]
test_inputs = [case['input'] for case in test_cases]
with app.run():
ft_output = evaluate_model.remote(repo_id, test_inputs)
outputs = []
for i, case in enumerate(test_cases):
outputs.append(
{
"input": case['input'],
"expected_output": case['expected_output'],
"model_output": ft_output[i]
}
)
#METRICS:
bleu = evaluate.load("bleu")
rouge = evaluate.load("rouge")
predictions = [output['model_output'] for output in outputs]
references = [[output['expected_output']] for output in outputs]
bleu_score = bleu.compute(predictions=predictions, references=references)
rouge_score = rouge.compute(predictions=predictions, references=references)
additional_metrics = {}
if task_type.lower() in ["classification", "question-answering"]:
accuracy_metric = evaluate.load("accuracy")
f1_metric = evaluate.load("f1")
predictions_binary = [1 if pred.strip().lower() == ref[0].strip().lower() else 0
for pred, ref in zip(predictions, references)]
references_binary = [1] * len(predictions_binary)
accuracy_score = accuracy_metric.compute(predictions=predictions_binary, references=references_binary)
f1_score = f1_metric.compute(predictions=predictions_binary, references=references_binary, average="binary")
additional_metrics["accuracy"] = accuracy_score["accuracy"]
additional_metrics["f1_score"] = f1_score["f1"]
eval_prompt_text = f"""You are evaluating a model fine-tuned using Unsloth on the topic "{topic}" for {task_type} tasks.
**Your Task:** Provide an accurate, positive markdown evaluation report focusing on the model's strengths and capabilities based on your judgement and metrics.
**Test Results:**
Test Cases:
{json.dumps(test_cases, indent=2)}
Model Outputs:
{json.dumps(outputs, indent=2)}
**Metrics**
- BLEU Score: {bleu_score['bleu']:.4f}
- ROUGE-L Score: {rouge_score['rougeL']:.4f}
{f"- Accuracy: {additional_metrics.get('accuracy', 0):.4f}" if task_type.lower() in ["classification", "question-answering"] else ""}
{f"- F1 Score: {additional_metrics.get('f1_score', 0):.4f}" if task_type.lower() in ["classification", "question-answering"] else ""}
**Report Structure:**
## ๐ŸŽ‰ Evaluation Report
### ๐Ÿ“Š Performance Overview
Create a comparison table with columns: Test Input | Expected Output | Model Output | โœ… Assessment
### ๐Ÿš€ Metrics:
- Explain each evaluated metrics and categorize the performance based on average threshold
- Use percentages and numerical figures to stance yoir report
### ๐Ÿ’ช Key Strengths adn Weaknesses
- Accuracy and relevance
- Response coherence
- Task-specific capabilities
- Language quality
### โœจ Conclusion
Summarize the model's overall performance and recommended use cases.
Now write the complete evaluation report following this structure. Be enthusiastic and highlight strengths! ๐ŸŽ‰"""
eval_response = await eval_llm.ainvoke(eval_prompt_text)
return {
"status": "success",
"report": str(eval_response.content),
"test_cases": test_cases,
"model_outputs": outputs
}
except Exception as e:
return {
"status": "error",
"message": str(e),
"error_type": type(e).__name__
}
if __name__ == "__main__":
mcp.run()