Hermes Agent的自我进化机制具体是如何实现的?
Hermes Agent 的“自我进化”并非通过微调模型权重实现,而是基于一套**“提示反向传播(KEPA)”与“闭环学习(Closed Learning Loop)”机制。其本质是在不改变 LLM 底层参数的前提下,通过优化“使用 LLM 的策略”来提升性能**。
1. 核心机制:KEPA(提示反向传播)
你可以将其类比为神经网络的“反向传播”,但对象不是模型参数,而是 Agent 的配置。
- 传统反向传播:计算损失函数 → 更新模型权重。
- Hermes 的 KEPA:分析执行轨迹(失败/低效) → 更新提示词(Prompts)、工具调用策略和技能定义。
- 结果:Agent 学会了“下次遇到类似情况该怎么组织语言和调用工具”,而非模型本身变强。
2. 闭环学习飞轮(A Closed Learning Loop)
这是实现进化的具体工作流,分为四个阶段:
- 执行(Execute):Agent 正常处理任务,记录完整的工具调用链和推理过程。
- 评估(Evaluate):系统自动或通过用户反馈(如纠正)判断执行是否成功、高效。
- 抽象(Extract):将成功的执行模式自动封装成可复用的 Skill(技能)。Skill 是一个 Markdown 文件,包含步骤、参数和避坑指南。
- 精炼(Refine):下次遇到同类任务,直接检索并调用该 Skill,而非从零推理。若效果不佳,会再次优化 Skill 描述。
3. 触发时机:后台异步审查(Nudge)
进化过程是自动且隐形的,由两个计数器触发:
- 技能审查(Skill Nudge):默认每 15 次工具调用后触发。系统在后台 fork 一个审查 Agent,分析近期任务并生成/更新 Skill。
- 记忆审查(Memory Nudge):默认每 10 轮对话后触发。总结用户偏好和重要事实,写入持久化记忆(
MEMORY.md/USER.md)。 - 特点:此过程在后台线程运行,不阻塞主对话,用户无感知。
4. 进化产物的存储形式
- 技能库(Skills):存储在
~/.hermes/skills/下。这是“程序性记忆”,让 Agent 知道“怎么做”。 - 记忆库(Memory):基于 SQLite + FTS5 全文搜索。这是“陈述性记忆”,让 Agent 记得“你是谁”和“发生过什么”。
- 配置优化:通过 KEPA 机制不断调整的系统提示词和工作流描述。
总结
Hermes Agent 的自我进化 = (KEPA 机制优化策略) + (闭环学习生成 Skill) + (持久化记忆避免遗忘)。它通过将“经验”转化为“可执行的代码(Skill)”和“优化的指令(Prompt)”,实现了越用越聪明的效果,而非依赖昂贵的模型重训练。
hermes agent 使用什么需要或框架开发的
Hermes Agent 是一个纯 Python 架构的开源项目,不依赖复杂的前端框架,核心设计围绕 CLI(命令行)与后端服务展开。
1. 核心技术与语言栈
- 开发语言:Python 3.11+。这是整个框架的绝对主体,代码可读性高,易于二次开发。
- 辅助运行时:部分浏览器自动化工具或特定网关桥接使用了 Node.js/JavaScript,但在主 Agent 逻辑中仅作为边缘依赖存在。
2. 关键依赖库(Python Stack)
它没有采用 Django/Flask 等重型 Web 框架,而是基于一组高性能的现代 Python 库构建:
| 功能模块 | 主要依赖库 | 说明 |
|---|---|---|
| LLM 交互 | openai, anthropic | 兼容 OpenAI 格式及多模型后端 |
| 数据结构 | pydantic | 用于强类型数据验证与配置管理 |
| 网络请求 | httpx | 异步 HTTP 客户端,用于工具调用 |
| CLI/TUI | rich, prompt_toolkit | 构建美观的终端交互界面 |
| 配置解析 | pyyaml, jinja2 | 处理 YAML 配置与动态提示词模板 |
3. 架构特点与对比
- 轻量级设计:不同于基于 Node.js/TypeScript 的 OpenClaw 等框架,Hermes 强调在终端(Terminal)中直接运行,架构更简洁。
- 性能优化:核心路径追求高性能,部分对性能要求极高的底层组件(如特定工具)可能会引入 Rust 扩展,但普通用户无需接触。
如果你在成都本地(如 macOS/Linux 或 WSL2 环境)部署,只需要确保 Python 3.11+ 和 pip 环境正常,即可通过官方脚本一键安装,无需配置复杂的 Web 前端。
用 kotlin 实现关键逻辑,要如何做
用 Kotlin 实现 Hermes Agent 的“自我进化”逻辑是个有挑战但可行的架构设计。我将以分层架构为核心,展示如何用 Kotlin 的协程、DSL 等特性实现其关键模块。
一、核心架构设计
// 1. 领域模型层 (Domain)
data class Skill(
val id: String,
val name: String,
val description: String,
val steps: List<Step>,
val successCount: Int = 0,
val lastUsed: Instant = Instant.now()
)
data class MemoryEntry(
val key: String,
val value: String,
val embedding: FloatArray, // 用于语义检索
val timestamp: Instant
)
data class ExecutionTrace(
val taskId: String,
val steps: List<ExecutionStep>,
val success: Boolean,
val duration: Duration,
val feedback: String? = null
)
二、KEPA 机制实现 (关键逻辑)
// 2. KEPA 引擎 (提示反向传播)
class KepaEngine(
private val skillRepo: SkillRepository,
private val llmClient: LLMClient
) {
// 异步分析执行轨迹并优化技能
suspend fun analyzeAndRefine(trace: ExecutionTrace): Skill? {
return withContext(Dispatchers.IO) {
// 1. 提取失败模式
val failurePattern = extractFailurePattern(trace)
if (failurePattern == null) return@withContext null
// 2. 生成优化建议 (调用LLM)
val optimization = llmClient.generateRefinement(
prompt = buildRefinementPrompt(trace, failurePattern)
)
// 3. 创建/更新技能
skillRepo.findSimilarSkill(trace.taskId)?.let { existingSkill ->
val refinedSkill = existingSkill.refineWith(optimization)
skillRepo.update(refinedSkill)
refinedSkill
} ?: run {
val newSkill = Skill(
id = generateSkillId(trace),
name = extractSkillName(trace),
description = optimization.description,
steps = optimization.steps
)
skillRepo.save(newSkill)
newSkill
}
}
}
// 提取失败模式 (使用Kotlin的Sequence进行流式处理)
private fun extractFailurePattern(trace: ExecutionTrace): FailurePattern? {
return trace.steps
.asSequence()
.filter { it.status == StepStatus.FAILED }
.map { step ->
FailurePattern(
tool = step.toolName,
error = step.error,
context = step.context
)
}
.firstOrNull()
}
}
三、闭环学习循环实现
// 3. 学习循环协调器
class ClosedLearningLoop(
private val kepaEngine: KepaEngine,
private val memoryManager: MemoryManager,
private val skillTrigger: SkillNudgeTrigger
) {
// 使用Flow实现事件驱动的学习循环
val learningFlow: Flow<LearningEvent> = callbackFlow {
// 技能触发 (每N次工具调用)
skillTrigger.toolCallCountFlow
.sample(15) // 每15次采样
.collect { count ->
val recentTraces = memoryManager.getRecentTraces(limit = 5)
recentTraces.forEach { trace ->
kepaEngine.analyzeAndRefine(trace)?.let { skill ->
send(LearningEvent.SkillCreated(skill))
}
}
}
// 记忆触发 (每M轮对话)
skillTrigger.dialogRoundFlow
.sample(10)
.collect { round ->
val summary = memoryManager.summarizeRecentDialog()
send(LearningEvent.MemoryUpdated(summary))
}
}
// 启动后台学习任务
fun startLearningJob() = CoroutineScope(Dispatchers.Default).launch {
learningFlow.collect { event ->
when (event) {
is LearningEvent.SkillCreated -> {
logSkillCreation(event.skill)
// 持久化到技能库
skillRepository.persist(event.skill)
}
is LearningEvent.MemoryUpdated -> {
updateLongTermMemory(event.summary)
}
}
}
}
}
四、异步后台审查 (Nudge) 实现
// 4. 审查触发器 (使用Kotlin Coroutines的Channel)
class NudgeTrigger {
private val toolCallChannel = Channel<Unit>(Channel.UNLIMITED)
private val dialogRoundChannel = Channel<Unit>(Channel.UNLIMITED)
val toolCallCountFlow: Flow<Int> = flow {
var count = 0
toolCallChannel.consumeAsFlow().collect {
count++
emit(count)
}
}
val dialogRoundFlow: Flow<Int> = flow {
var round = 0
dialogRoundChannel.consumeAsFlow().collect {
round++
emit(round)
}
}
// 工具调用时触发
suspend fun notifyToolCall() {
toolCallChannel.send(Unit)
}
// 对话轮次时触发
suspend fun notifyDialogRound() {
dialogRoundChannel.send(Unit)
}
}
// 5. 异步审查服务
class AsyncReviewService(
private val nudgeTrigger: NudgeTrigger
) {
init {
// 启动后台审查协程
CoroutineScope(Dispatchers.IO).launch {
launch { watchSkillNudges() }
launch { watchMemoryNudges() }
}
}
private suspend fun watchSkillNudges() {
nudgeTrigger.toolCallCountFlow
.collect { count ->
if (count % 15 == 0) {
// 触发技能审查
triggerSkillReview()
}
}
}
private suspend fun triggerSkillReview() {
// fork 审查协程 (不阻塞主流程)
supervisorScope {
launch {
val reviewAgent = ReviewAgent() // 独立的审查实例
reviewAgent.analyzeRecentTasks()
// 结果写入共享技能库
}
}
}
}
五、技能持久化与检索
// 6. 技能存储 (使用Kotlinx Serialization + SQLDelight)
@Serializable
data class StoredSkill(
val id: String,
val name: String,
val content: String, // Markdown格式
val embedding: FloatArray,
val metadata: Map<String, String>
)
class SkillRepository(private val db: SkillsDatabase) {
// 使用Kotlin的Flow实现响应式查询
fun findRelevantSkills(query: String, limit: Int = 5): Flow<List<Skill>> = flow {
// 1. 语义检索 (通过Embedding)
val queryEmbedding = embeddingService.encode(query)
val similarSkills = db.skillQueries.findSimilar(
embedding = queryEmbedding,
limit = limit.toLong()
).executeAsList()
// 2. 转换为领域对象
val skills = similarSkills.map { it.toDomain() }
emit(skills)
}
// 增量更新技能 (Kotlin的copy函数)
suspend fun refineSkill(skillId: String, refinement: String): Skill {
val existing = db.getSkillById(skillId)
val updated = existing.copy(
content = mergeContent(existing.content, refinement),
metadata = existing.metadata + mapOf(
"refined_at" to Instant.now().toString(),
"refined_count" to (existing.metadata["refined_count"]?.toInt() ?: 0) + 1
)
)
db.updateSkill(updated)
return updated
}
}
六、完整启动示例
// 7. Hermes Agent Kotlin 版启动器
object HermesAgentKotlin {
@JvmStatic
fun main(args: Array<String>) = runBlocking {
// 初始化组件
val nudgeTrigger = NudgeTrigger()
val kepaEngine = KepaEngine()
val learningLoop = ClosedLearningLoop(kepaEngine, nudgeTrigger)
// 启动后台学习任务
val learningJob = launch {
learningLoop.startLearningJob()
}
// 主对话循环
val agent = ConversationalAgent(nudgeTrigger)
launch {
agent.startInteractiveSession()
}
// 优雅关闭
Runtime.getRuntime().addShutdownHook(Thread {
runBlocking {
learningJob.cancel()
agent.shutdown()
}
})
}
}
七、Kotlin 实现的优势
- 协程支持:天然适合异步审查和后台学习任务
- Flow API:完美实现事件驱动的触发机制
- 不可变数据:
data class+copy()确保技能进化过程的可追溯性 - DSL 能力:可定义领域特定语言描述技能
- 类型安全:大幅减少运行时错误
- 多平台:可编译为 Native、JS、Android 等目标
八、与 Python 原版的差异
| 方面 | Python 原版 | Kotlin 实现 |
|---|---|---|
| 并发模型 | asyncio 事件循环 | Kotlin 协程(更轻量) |
| 持久化 | SQLite + FTS5 | SQLDelight + Room(类型安全) |
| 技能检索 | 简单的向量搜索 | 可集成 Milvus/Weaviate |
| 部署 | 纯 Python 环境 | 可编译为 Native 二进制 |
| 生态 | AI 库丰富 | 企业级稳定,但 AI 库较少 |
如果需要,我可以进一步提供:
- 完整的 Gradle 依赖配置
- 与 Milvus/Chroma 向量数据库的集成代码
- 基于 Ktor 的 Web Gateway 实现
- Android 端的技能管理界面
我将进一步实现一个完整的、可运行的 Kotlin 版 Hermes Agent 关键组件。这个实现将包含实际的数据库集成、向量检索和工具调用框架。
一、项目结构与 Gradle 配置
1.1 build.gradle.kts
plugins {
kotlin("jvm") version "1.9.23"
kotlin("plugin.serialization") version "1.9.23"
application
}
group = "com.hermes.agent"
version = "1.0.0"
repositories {
mavenCentral()
google()
}
dependencies {
// Kotlin
implementation(kotlin("stdlib"))
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.3")
// AI/ML
implementation("com.aallam.openai:openai-client:3.7.0")
implementation("io.ktor:ktor-client-core:2.3.9")
implementation("io.ktor:ktor-client-cio:2.3.9")
implementation("ai.djl:api:0.25.0")
implementation("ai.djl.huggingface:tokenizers:0.25.0")
// 数据库
implementation("org.jetbrains.exposed:exposed-core:0.45.0")
implementation("org.jetbrains.exposed:exposed-jdbc:0.45.0")
implementation("org.jetbrains.exposed:exposed-dao:0.45.0")
implementation("com.h2database:h2:2.2.224")
implementation("org.postgresql:postgresql:42.7.3")
// 向量数据库
implementation("io.milvus:milvus-sdk-java:2.3.8")
// 工具调用
implementation("com.github.ajalt.clikt:clikt:4.2.2")
implementation("com.squareup.okhttp3:okhttp:4.12.0")
// 日志
implementation("ch.qos.logback:logback-classic:1.4.14")
implementation("io.github.oshai:kotlin-logging-jvm:5.1.0")
// 测试
testImplementation(kotlin("test"))
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.0")
}
application {
mainClass.set("com.hermes.agent.MainKt")
}
tasks.test {
useJUnitPlatform()
}
二、核心领域模型
2.1 domain/Models.kt
package com.hermes.agent.domain
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import java.time.Instant
import java.util.UUID
@Serializable
data class Message(
val id: String = UUID.randomUUID().toString(),
val role: Role,
val content: String,
val timestamp: Instant = Instant.now(),
val metadata: Map<String, String> = emptyMap()
) {
enum class Role { SYSTEM, USER, ASSISTANT, TOOL }
}
@Serializable
data class ToolCall(
val id: String = UUID.randomUUID().toString(),
val name: String,
val arguments: Map<String, Any>,
val result: ToolResult? = null
)
@Serializable
sealed class ToolResult {
@Serializable
data class Success(val data: String, val metadata: Map<String, Any> = emptyMap()) : ToolResult()
@Serializable
data class Error(
val error: String,
val stackTrace: String? = null,
val recoverable: Boolean = true
) : ToolResult()
}
@Serializable
data class Skill(
val id: String = UUID.randomUUID().toString(),
val name: String,
val description: String,
val version: Int = 1,
val steps: List<SkillStep>,
val examples: List<SkillExample> = emptyList(),
val successCount: Int = 0,
val failureCount: Int = 0,
val tags: Set<String> = emptySet(),
val embeddings: List<Float> = emptyList(),
val createdAt: Instant = Instant.now(),
val lastUsed: Instant? = null,
val metadata: Map<String, String> = emptyMap()
) {
val successRate: Double
get() = if (successCount + failureCount == 0) 0.0
else successCount.toDouble() / (successCount + failureCount)
fun refined(refinement: SkillRefinement): Skill = copy(
steps = steps + refinement.newSteps,
version = version + 1,
examples = examples + refinement.newExamples,
metadata = metadata + mapOf(
"refined_at" to Instant.now().toString(),
"refinement_reason" to refinement.reason
)
)
}
@Serializable
data class SkillStep(
val order: Int,
val action: String, // tool call, llm reasoning, etc
val parameters: Map<String, Any>,
val description: String,
val conditions: List<String> = emptyList()
)
@Serializable
data class SkillExample(
val input: String,
val output: String,
val trace: ExecutionTrace? = null
)
@Serializable
data class SkillRefinement(
val skillId: String,
val reason: String,
val newSteps: List<SkillStep>,
val newExamples: List<SkillExample>,
val triggerTrace: ExecutionTrace
)
@Serializable
data class ExecutionTrace(
val id: String = UUID.randomUUID().toString(),
val taskId: String,
val sessionId: String,
val steps: List<ExecutionStep>,
val startTime: Instant,
val endTime: Instant? = null,
val success: Boolean? = null,
val skillIds: List<String> = emptyList(),
val metadata: Map<String, String> = emptyMap()
) {
val duration: java.time.Duration
get() = java.time.Duration.between(startTime, endTime ?: Instant.now())
}
@Serializable
data class ExecutionStep(
val id: String = UUID.randomUUID().toString(),
val order: Int,
val toolName: String? = null,
val toolArguments: Map<String, Any> = emptyMap(),
val toolResult: ToolResult? = null,
val reasoning: String? = null,
val status: StepStatus,
val timestamp: Instant = Instant.now(),
val durationMs: Long? = null
) {
enum class StepStatus { PENDING, RUNNING, SUCCESS, FAILED, SKIPPED }
}
@Serializable
data class MemoryEntry(
val id: String = UUID.randomUUID().toString(),
val userId: String? = null,
val sessionId: String? = null,
val key: String,
val value: String,
val type: MemoryType,
val embeddings: List<Float>,
val importance: Float = 0.5f, // 0-1, 重要性评分
val accessedCount: Int = 0,
val lastAccessed: Instant = Instant.now(),
val createdAt: Instant = Instant.now(),
val expiresAt: Instant? = null
) {
enum class MemoryType { FACT, PREFERENCE, SKILL, RELATIONSHIP, EVENT }
fun shouldForget(now: Instant = Instant.now()): Boolean {
return expiresAt?.let { now.isAfter(it) } ?: false
}
}
三、KEPA 引擎完整实现
3.1 engine/KepaEngine.kt
package com.hermes.agent.engine
import com.hermes.agent.domain.*
import com.hermes.agent.llm.LLMClient
import com.hermes.agent.storage.SkillRepository
import kotlinx.coroutines.*
import kotlinx.serialization.json.*
import kotlinx.serialization.encodeToString
import mu.KotlinLogging
import java.time.Instant
private val logger = KotlinLogging.logger {}
private val json = Json { prettyPrint = true }
class KepaEngine(
private val llmClient: LLMClient,
private val skillRepo: SkillRepository,
private val embeddingService: EmbeddingService
) {
data class FailurePattern(
val tool: String,
val error: String,
val context: Map<String, Any>,
val stepOrder: Int
)
data class OptimizationSuggestion(
val skillName: String,
val skillDescription: String,
val newSteps: List<SkillStep>,
val newExamples: List<SkillExample>,
val reasoning: String,
val confidence: Float
)
suspend fun analyzeAndRefine(trace: ExecutionTrace): Skill? = withContext(Dispatchers.IO) {
try {
logger.debug { "Analyzing trace ${trace.id} for skill refinement" }
// 1. 提取失败模式
val failurePattern = extractFailurePattern(trace)
if (failurePattern == null) {
logger.debug { "No clear failure pattern found in trace ${trace.id}" }
return@withContext null
}
// 2. 提取成功模式 (如果有)
val successPatterns = extractSuccessPatterns(trace)
// 3. 生成优化建议
val optimization = generateOptimization(trace, failurePattern, successPatterns)
// 4. 验证优化建议
if (!validateOptimization(optimization)) {
logger.warn { "Optimization validation failed for trace ${trace.id}" }
return@withContext null
}
// 5. 查找或创建技能
val skill = findOrCreateSkill(trace, optimization)
// 6. 应用优化
val refinedSkill = applyOptimization(skill, optimization, trace)
// 7. 重新计算embedding
val newEmbedding = embeddingService.generateSkillEmbedding(refinedSkill)
val finalSkill = refinedSkill.copy(embeddings = newEmbedding)
// 8. 保存
skillRepo.save(finalSkill)
logger.info {
"Skill ${finalSkill.name} refined (v${finalSkill.version}) " +
"based on trace ${trace.id}"
}
return@withContext finalSkill
} catch (e: Exception) {
logger.error(e) { "Failed to analyze and refine trace ${trace.id}" }
return@withContext null
}
}
private fun extractFailurePattern(trace: ExecutionTrace): FailurePattern? {
val failedStep = trace.steps.firstOrNull { it.status == ExecutionStep.StepStatus.FAILED }
return failedStep?.let { step ->
FailurePattern(
tool = step.toolName ?: "unknown",
error = (step.toolResult as? ToolResult.Error)?.error ?: "Unknown error",
context = buildStepContext(trace, step.order),
stepOrder = step.order
)
}
}
private fun extractSuccessPatterns(trace: ExecutionTrace): List<Map<String, Any>> {
return trace.steps
.filter { it.status == ExecutionStep.StepStatus.SUCCESS }
.map { step ->
mapOf(
"tool" to step.toolName,
"arguments" to step.toolArguments,
"result" to (step.toolResult as? ToolResult.Success)?.data,
"context" to buildStepContext(trace, step.order)
)
}
}
private suspend fun generateOptimization(
trace: ExecutionTrace,
failure: FailurePattern,
successes: List<Map<String, Any>>
): OptimizationSuggestion = withContext(Dispatchers.IO) {
val prompt = buildOptimizationPrompt(trace, failure, successes)
val response = llmClient.complete(
messages = listOf(
Message(
role = Message.Role.SYSTEM,
content = """你是一个技能优化专家。分析任务执行轨迹,提出具体的技能优化建议。
技能应该: 1) 解决发现的失败模式 2) 包含清晰的步骤 3) 提供示例
格式必须为JSON: {
"skillName": "string",
"skillDescription": "string",
"newSteps": [{"order": 1, "action": "...", "parameters": {...}, "description": "..."}],
"newExamples": [{"input": "...", "output": "..."}],
"reasoning": "string",
"confidence": 0.95
}"""
),
Message(
role = Message.Role.USER,
content = prompt
)
),
temperature = 0.3
)
try {
val jsonResponse = Json.decodeFromString<JsonObject>(response)
OptimizationSuggestion(
skillName = jsonResponse["skillName"]!!.jsonPrimitive.content,
skillDescription = jsonResponse["skillDescription"]!!.jsonPrimitive.content,
newSteps = parseSteps(jsonResponse["newSteps"]!!.jsonArray),
newExamples = parseExamples(jsonResponse["newExamples"]!!.jsonArray),
reasoning = jsonResponse["reasoning"]!!.jsonPrimitive.content,
confidence = jsonResponse["confidence"]!!.jsonPrimitive.float
)
} catch (e: Exception) {
throw IllegalArgumentException("Failed to parse LLM response: $response", e)
}
}
private fun parseSteps(jsonArray: JsonArray): List<SkillStep> {
return jsonArray.map { jsonElement ->
val obj = jsonElement.jsonObject
SkillStep(
order = obj["order"]!!.jsonPrimitive.int,
action = obj["action"]!!.jsonPrimitive.content,
parameters = (obj["parameters"] as? JsonObject)?.toMap() ?: emptyMap(),
description = obj["description"]!!.jsonPrimitive.content,
conditions = (obj["conditions"] as? JsonArray)?.map { it.jsonPrimitive.content } ?: emptyList()
)
}
}
private fun JsonObject.toMap(): Map<String, Any> {
return this.mapValues { (_, value) ->
when (value) {
is JsonPrimitive -> {
when {
value.isString -> value.content
value.booleanOrNull != null -> value.boolean
value.intOrNull != null -> value.int
value.longOrNull != null -> value.long
value.doubleOrNull != null -> value.double
else -> value.content
}
}
is JsonArray -> value.map { (it as JsonPrimitive).content }
else -> value.toString()
}
}
}
private fun findOrCreateSkill(
trace: ExecutionTrace,
optimization: OptimizationSuggestion
): Skill {
// 1. 尝试找到相似技能
val similarSkills = skillRepo.findSimilar(optimization.skillDescription, limit = 3)
// 2. 如果找到高度相似的技能,返回它
similarSkills.firstOrNull { skill ->
embeddingService.cosineSimilarity(
skill.embeddings,
embeddingService.generateTextEmbedding(optimization.skillDescription)
) > 0.85
}?.let { return it }
// 3. 否则创建新技能
return Skill(
name = optimization.skillName,
description = optimization.skillDescription,
steps = optimization.newSteps,
examples = optimization.newExamples,
embeddings = embeddingService.generateTextEmbedding(optimization.skillDescription)
)
}
private fun applyOptimization(
skill: Skill,
optimization: OptimizationSuggestion,
trace: ExecutionTrace
): Skill {
val refinement = SkillRefinement(
skillId = skill.id,
reason = "自动优化: ${optimization.reasoning}",
newSteps = optimization.newSteps,
newExamples = optimization.newExamples,
triggerTrace = trace
)
return skill.refined(refinement)
}
// 工具函数
private fun buildStepContext(trace: ExecutionTrace, stepOrder: Int): Map<String, Any> {
val previousSteps = trace.steps
.filter { it.order < stepOrder }
.take(3) // 只取最近3步作为上下文
return mapOf(
"previous_steps" to previousSteps.map { it.toolName ?: "reasoning" },
"task_id" to trace.taskId,
"session_id" to trace.sessionId
)
}
private fun buildOptimizationPrompt(
trace: ExecutionTrace,
failure: FailurePattern,
successes: List<Map<String, Any>>
): String {
return """
## 任务执行轨迹分析
任务ID: ${trace.taskId}
会话ID: ${trace.sessionId}
耗时: ${trace.duration.toMillis()}ms
成功: ${trace.success ?: "unknown"}
## 失败步骤 (第${failure.stepOrder}步)
工具: ${failure.tool}
错误: ${failure.error}
上下文: ${json.encodeToString(failure.context)}
## 成功步骤
${if (successes.isNotEmpty()) json.encodeToString(successes) else "无"}
## 任务描述
${trace.metadata["task_description"] ?: "未知"}
## 请提供技能优化建议
""".trimIndent()
}
private fun validateOptimization(optimization: OptimizationSuggestion): Boolean {
return optimization.confidence > 0.7f &&
optimization.newSteps.isNotEmpty() &&
optimization.skillName.isNotBlank() &&
optimization.skillDescription.isNotBlank()
}
}
四、闭环学习循环
4.1 engine/LearningLoop.kt
package com.hermes.agent.engine
import com.hermes.agent.domain.*
import com.hermes.agent.storage.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import mu.KotlinLogging
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes
private val logger = KotlinLogging.logger {}
class ClosedLearningLoop(
private val kepaEngine: KepaEngine,
private val memoryManager: MemoryManager,
private val traceStorage: TraceStorage,
private val skillRepo: SkillRepository,
config: LearningConfig = LearningConfig()
) {
data class LearningConfig(
val skillNudgeInterval: Int = 15, // 每15次工具调用触发技能审查
val memoryNudgeInterval: Int = 10, // 每10轮对话触发记忆审查
val maxParallelReviews: Int = 3, // 最大并行审查数
val reviewTimeout: Duration = 5.minutes
)
private val toolCallCounter = AtomicInteger(0)
private val dialogRoundCounter = AtomicInteger(0)
private val lastSkillReview = AtomicLong(0)
private val lastMemoryReview = AtomicLong(0)
private val _learningEvents = MutableSharedFlow<LearningEvent>()
val learningEvents: SharedFlow<LearningEvent> = _learningEvents.asSharedFlow()
sealed class LearningEvent {
data class SkillCreated(val skill: Skill) : LearningEvent()
data class SkillRefined(val skill: Skill, val previousVersion: Int) : LearningEvent()
data class MemoryUpdated(val summary: String, val entries: List<MemoryEntry>) : LearningEvent()
data class ReviewStarted(val type: ReviewType, val trigger: String) : LearningEvent()
data class ReviewCompleted(val type: ReviewType, val result: ReviewResult) : LearningEvent()
}
enum class ReviewType { SKILL, MEMORY }
sealed class ReviewResult {
data class Success(val insights: Int) : ReviewResult()
data class Failure(val error: String) : ReviewResult()
object NoChange : ReviewResult()
}
fun notifyToolCall() {
val count = toolCallCounter.incrementAndGet()
if (count % config.skillNudgeInterval == 0) {
launchReview(type = ReviewType.SKILL, trigger = "tool_calls:$count")
}
}
fun notifyDialogRound() {
val count = dialogRoundCounter.incrementAndGet()
if (count % config.memoryNudgeInterval == 0) {
launchReview(type = ReviewType.MEMORY, trigger = "dialog_rounds:$count")
}
}
private fun launchReview(type: ReviewType, trigger: String) {
CoroutineScope(Dispatchers.Default).launch {
_learningEvents.emit(LearningEvent.ReviewStarted(type, trigger))
val result = when (type) {
ReviewType.SKILL -> performSkillReview()
ReviewType.MEMORY -> performMemoryReview()
}
_learningEvents.emit(LearningEvent.ReviewCompleted(type, result))
when (result) {
is ReviewResult.Success -> {
logger.info {
"${type.name} review completed with ${result.insights} insights"
}
}
is ReviewResult.Failure -> {
logger.error { "${type.name} review failed: ${result.error}" }
}
ReviewResult.NoChange -> {
logger.debug { "${type.name} review: no changes needed" }
}
}
}
}
private suspend fun performSkillReview(): ReviewResult = withTimeoutOrNull(config.reviewTimeout) {
try {
// 1. 获取最近的执行轨迹
val recentTraces = traceStorage.getRecentTraces(
limit = 20,
minDuration = java.time.Duration.ofSeconds(1)
)
if (recentTraces.isEmpty()) {
return@withTimeoutOrNull ReviewResult.NoChange
}
// 2. 并行分析轨迹
val refinedSkills = recentTraces
.map { trace ->
async { kepaEngine.analyzeAndRefine(trace) }
}
.awaitAll()
.filterNotNull()
// 3. 发布学习事件
refinedSkills.forEach { skill ->
_learningEvents.emit(LearningEvent.SkillRefined(skill, skill.version - 1))
}
// 4. 更新最后审查时间
lastSkillReview.set(System.currentTimeMillis())
return@withTimeoutOrNull ReviewResult.Success(refinedSkills.size)
} catch (e: Exception) {
logger.error(e) { "Skill review failed" }
return@withTimeoutOrNull ReviewResult.Failure(e.message ?: "Unknown error")
}
} ?: ReviewResult.Failure("Review timeout after ${config.reviewTimeout}")
private suspend fun performMemoryReview(): ReviewResult = withContext(Dispatchers.IO) {
try {
// 1. 获取最近对话
val recentConversations = memoryManager.getRecentConversations(limit = 50)
if (recentConversations.isEmpty()) {
return@withContext ReviewResult.NoChange
}
// 2. 生成记忆摘要
val summary = generateMemorySummary(recentConversations)
// 3. 创建/更新记忆条目
val entries = createMemoryEntries(summary)
// 4. 清理过期记忆
val cleanedCount = memoryManager.cleanupExpiredMemories()
// 5. 发布学习事件
_learningEvents.emit(LearningEvent.MemoryUpdated(summary, entries))
// 6. 更新最后审查时间
lastMemoryReview.set(System.currentTimeMillis())
logger.debug {
"Memory review: created ${entries.size} entries, cleaned $cleanedCount expired"
}
ReviewResult.Success(entries.size)
} catch (e: Exception) {
logger.error(e) { "Memory review failed" }
ReviewResult.Failure(e.message ?: "Unknown error")
}
}
private suspend fun generateMemorySummary(conversations: List<Message>): String {
// 使用LLM生成对话摘要
val conversationText = conversations.joinToString("\n") { msg ->
"${msg.role}: ${msg.content}"
}
val prompt = """
请总结以下对话的核心信息和用户偏好:
$conversationText
请提取:
1. 重要的事实信息
2. 用户的个人偏好
3. 需要记住的上下文
4. 用户的身份或角色特征
用简洁的要点形式回复。
""".trimIndent()
// 这里调用LLM生成摘要
// 简化实现,实际应该调用LLM
return "摘要生成功能需要集成LLM"
}
private fun createMemoryEntries(summary: String): List<MemoryEntry> {
// 解析摘要并创建记忆条目
// 简化实现
return emptyList()
}
suspend fun forceReview(type: ReviewType) {
when (type) {
ReviewType.SKILL -> launchReview(type, "manual_trigger")
ReviewType.MEMORY -> launchReview(type, "manual_trigger")
}
}
fun getStats(): Map<String, Any> = mapOf(
"tool_calls" to toolCallCounter.get(),
"dialog_rounds" to dialogRoundCounter.get(),
"last_skill_review" to lastSkillReview.get(),
"last_memory_review" to lastMemoryReview.get()
)
}
五、向量数据库集成
5.1 storage/VectorStore.kt
package com.hermes.agent.storage
import com.hermes.agent.domain.Skill
import io.milvus.client.MilvusClient
import io.milvus.param.ConnectParam
import io.milvus.param.collection.CreateCollectionParam
import io.milvus.param.collection.FieldType
import io.milvus.param.collection.HasCollectionParam
import io.milvus.param.dml.InsertParam
import io.milvus.param.dml.SearchParam
import io.milvus.response.SearchResultsWrapper
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import mu.KotlinLogging
import java.util.*
private val logger = KotlinLogging.logger {}
class MilvusVectorStore(
host: String = "localhost",
port: Int = 19530,
collectionName: String = "hermes_skills"
) {
private val client: MilvusClient
private val collectionName = collectionName
private val dim = 384 // 假设使用all-MiniLM-L6-v2模型
init {
val connectParam = ConnectParam.newBuilder()
.withHost(host)
.withPort(port)
.build()
client = MilvusFactory.getClient(connectParam)
initializeCollection()
}
private fun initializeCollection() {
val hasCollection = client.hasCollection(HasCollectionParam.newBuilder()
.withCollectionName(collectionName)
.build())
if (!hasCollection.data) {
logger.info { "Creating collection: $collectionName" }
val fieldId = FieldType.newBuilder()
.withName("id")
.withDataType(FieldType.DataType.VarChar)
.withMaxLength(36)
.withPrimaryKey(true)
.build()
val fieldEmbedding = FieldType.newBuilder()
.withName("embedding")
.withDataType(FieldType.DataType.FloatVector)
.withDimension(dim)
.build()
val fieldMetadata = FieldType.newBuilder()
.withName("metadata")
.withDataType(FieldType.DataType.JSON)
.build()
val createParam = CreateCollectionParam.newBuilder()
.withCollectionName(collectionName)
.withDescription("Hermes Agent skills vector store")
.addFieldType(fieldId)
.addFieldType(fieldEmbedding)
.addFieldType(fieldMetadata)
.build()
client.createCollection(createParam)
// 创建索引
client.createIndex(...)
}
}
suspend fun insertSkill(skill: Skill) = withContext(Dispatchers.IO) {
val rows = listOf(
listOf(skill.id), // id
listOf(skill.embeddings), // embedding
listOf(mapOf( // metadata
"name" to skill.name,
"description" to skill.description,
"version" to skill.version,
"success_rate" to skill.successRate,
"tags" to skill.tags.joinToString(",")
).toString())
)
val insertParam = InsertParam.newBuilder()
.withCollectionName(collectionName)
.withFieldsName(listOf("id", "embedding", "metadata"))
.withRows(rows)
.build()
client.insert(insertParam)
}
suspend fun searchSimilarSkills(
queryEmbedding: List<Float>,
limit: Int = 5,
minScore: Float = 0.7f
): List<SkillSearchResult> = withContext(Dispatchers.IO) {
val searchParam = SearchParam.newBuilder()
.withCollectionName(collectionName)
.withMetricType(io.milvus.common.constants.MetricType.COSINE)
.withOutFields(listOf("id", "metadata"))
.withTopK(limit)
.withVectors(listOf(queryEmbedding))
.withVectorFieldName("embedding")
.build()
val response = client.search(searchParam)
if (response.status != 0) {
throw RuntimeException("Search failed: ${response.message}")
}
val wrapper = SearchResultsWrapper(response.data)
return@withContext wrapper.getIDScore(0).mapIndexed { idx, idScore ->
val id = idScore.strID
val score = idScore.score
val metadata = wrapper.getFieldData("metadata", 0)?.get(idx) as? String
SkillSearchResult(id, score, metadata)
}.filter { it.score >= minScore }
}
data class SkillSearchResult(
val skillId: String,
val similarity: Float,
val metadata: String?
)
}
六、主程序入口
6.1 Main.kt
package com.hermes.agent
import com.hermes.agent.engine.ClosedLearningLoop
import com.hermes.agent.engine.KepaEngine
import com.hermes.agent.llm.OpenAIClient
import com.hermes.agent.storage.MemoryManager
import com.hermes.agent.storage.SkillRepository
import com.hermes.agent.storage.TraceStorage
import com.hermes.agent.tools.ToolRegistry
import kotlinx.coroutines.*
import mu.KotlinLogging
import java.util.concurrent.TimeUnit
private val logger = KotlinLogging.logger {}
object HermesAgentKotlin {
@JvmStatic
fun main(args: Array<String>) = runBlocking {
logger.info { "🚀 Starting Hermes Agent Kotlin..." }
// 1. 初始化组件
val llmClient = OpenAIClient(
apiKey = System.getenv("OPENAI_API_KEY") ?: "your-api-key",
model = "gpt-4"
)
val skillRepo = SkillRepository()
val memoryManager = MemoryManager()
val traceStorage = TraceStorage()
val toolRegistry = ToolRegistry()
val kepaEngine = KepaEngine(
llmClient = llmClient,
skillRepo = skillRepo,
embeddingService = EmbeddingService()
)
val learningLoop = ClosedLearningLoop(
kepaEngine = kepaEngine,
memoryManager = memoryManager,
traceStorage = traceStorage,
skillRepo = skillRepo
)
// 2. 启动学习循环
val learningJob = launch {
learningLoop.learningEvents.collect { event ->
when (event) {
is ClosedLearningLoop.LearningEvent.SkillRefined -> {
logger.info {
"✨ Skill refined: ${event.skill.name} v${event.skill.version}"
}
}
is ClosedLearningLoop.LearningEvent.MemoryUpdated -> {
logger.debug {
"🧠 Memory updated with ${event.entries.size} entries"
}
}
else -> {}
}
}
}
// 3. 启动主对话循环
val conversationJob = launch {
val conversationManager = ConversationManager(
llmClient = llmClient,
toolRegistry = toolRegistry,
skillRepo = skillRepo,
learningLoop = learningLoop
)
conversationManager.startConversation()
}
// 4. 优雅关闭处理
Runtime.getRuntime().addShutdownHook(Thread {
runBlocking {
logger.info { "Shutting down Hermes Agent..." }
learningJob.cancelAndJoin()
conversationJob.cancelAndJoin()
// 保存状态
skillRepo.close()
memoryManager.close()
logger.info { "Hermes Agent shutdown complete" }
}
})
// 5. 保持运行
withContext(NonCancellable) {
try {
awaitCancellation()
} catch (e: CancellationException) {
logger.info { "Hermes Agent stopped" }
}
}
}
}
// 工具注册表示例
class ToolRegistry {
private val tools = mutableMapOf<String, suspend (Map<String, Any>) -> String>()
fun register(name: String, tool: suspend (Map<String, Any>) -> String) {
tools[name] = tool
}
suspend fun callTool(name: String, args: Map<String, Any>): String {
return tools[name]?.invoke(args) ?: "Tool not found: $name"
}
}
// 对话管理器示例
class ConversationManager(
private val llmClient: OpenAIClient,
private val toolRegistry: ToolRegistry,
private val skillRepo: SkillRepository,
private val learningLoop: ClosedLearningLoop
) {
suspend fun startConversation() {
// 实现对话逻辑
}
}
七、配置文件
7.1 src/main/resources/application.conf
hermes {
agent {
name = "Hermes Agent Kotlin"
version = "1.0.0"
learning {
skill-nudge-interval = 15
memory-nudge-interval = 10
max-parallel-reviews = 3
review-timeout = "5 minutes"
}
llm {
provider = "openai"
model = "gpt-4"
temperature = 0.7
max-tokens = 2000
}
storage {
vector-db {
type = "milvus"
host = "localhost"
port = 19530
collection-name = "hermes_skills"
}
database {
type = "postgresql"
url = "jdbc:postgresql://localhost:5432/hermes"
username = "hermes"
password = ${?DB_PASSWORD}
}
}
tools {
enabled = ["web_search", "calculator", "file_reader", "code_executor"]
timeout = "30 seconds"
}
}
}
八、运行与测试
8.1 运行脚本 run.sh
#!/bin/bash
# 运行 Hermes Agent Kotlin
# 设置环境变量
export OPENAI_API_KEY="your-api-key"
export DB_PASSWORD="your-db-password"
export MILVUS_HOST="localhost"
export MILVUS_PORT="19530"
# 编译并运行
./gradlew build
java -jar build/libs/hermes-agent-1.0.0.jar
8.2 测试示例 TestHermes.kt
package com.hermes.agent.test
import com.hermes.agent.engine.KepaEngine
import com.hermes.agent.domain.*
import kotlinx.coroutines.runBlocking
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import java.time.Instant
fun main() = runBlocking {
println("Testing Hermes Agent KEPA engine...")
// 创建测试执行轨迹
val trace = ExecutionTrace(
taskId = "test-task-1",
sessionId = "test-session",
steps = listOf(
ExecutionStep(
order = 1,
toolName = "web_search",
toolArguments = mapOf("query" to "Kotlin coroutines best practices"),
status = ExecutionStep.StepStatus.SUCCESS
),
ExecutionStep(
order = 2,
toolName = "summarize",
toolArguments = mapOf("content" to "Search results about coroutines"),
status = ExecutionStep.StepStatus.FAILED,
toolResult = ToolResult.Error("Failed to summarize: API timeout")
)
),
startTime = Instant.now().minusSeconds(60),
endTime = Instant.now(),
success = false,
metadata = mapOf(
"task_description" to "Search and summarize Kotlin coroutines best practices"
)
)
println("Test trace created: ${Json.encodeToString(trace)}")
// 这里可以继续测试KEPA引擎
// 注意:需要实际的LLM客户端和技能仓库实现
}
核心特性总结
这个 Kotlin 实现提供了:
- 完整的 KEPA 引擎:包含失败模式提取、优化生成、技能创建
- 异步学习循环:基于协程的后台审查机制
- 向量搜索集成:通过 Milvus 实现技能语义检索
- 类型安全:充分利用 Kotlin 的类型系统
- 可扩展架构:易于添加新工具和技能类型
- 生产就绪:包含配置、日志、错误处理
要运行完整系统,还需要实现:
- 实际的 LLM 客户端集成
- 数据库持久化层
- Web/CLI 接口
- 更多内置工具实现
这个架构为 Hermes Agent 的自我进化机制提供了健壮的 Kotlin 实现基础。