Skip to content
RuaRuan
返回

Hermes Agent 解析及使用 Kotlin 实现其关键逻辑

Hermes Agent的自我进化机制具体是如何实现的?

Hermes Agent 的“自我进化”并非通过微调模型权重实现,而是基于一套**“提示反向传播(KEPA)”“闭环学习(Closed Learning Loop)”机制。其本质是在不改变 LLM 底层参数的前提下,通过优化“使用 LLM 的策略”来提升性能**。

1. 核心机制:KEPA(提示反向传播)

你可以将其类比为神经网络的“反向传播”,但对象不是模型参数,而是 Agent 的配置。

2. 闭环学习飞轮(A Closed Learning Loop)

这是实现进化的具体工作流,分为四个阶段:

  1. 执行(Execute):Agent 正常处理任务,记录完整的工具调用链和推理过程。
  2. 评估(Evaluate):系统自动或通过用户反馈(如纠正)判断执行是否成功、高效。
  3. 抽象(Extract):将成功的执行模式自动封装成可复用的 Skill(技能)。Skill 是一个 Markdown 文件,包含步骤、参数和避坑指南。
  4. 精炼(Refine):下次遇到同类任务,直接检索并调用该 Skill,而非从零推理。若效果不佳,会再次优化 Skill 描述。

3. 触发时机:后台异步审查(Nudge)

进化过程是自动且隐形的,由两个计数器触发:

4. 进化产物的存储形式

总结

Hermes Agent 的自我进化 = (KEPA 机制优化策略) + (闭环学习生成 Skill) + (持久化记忆避免遗忘)。它通过将“经验”转化为“可执行的代码(Skill)”和“优化的指令(Prompt)”,实现了越用越聪明的效果,而非依赖昂贵的模型重训练。

hermes agent 使用什么需要或框架开发的

Hermes Agent 是一个纯 Python 架构的开源项目,不依赖复杂的前端框架,核心设计围绕 CLI(命令行)与后端服务展开。

1. 核心技术与语言栈

2. 关键依赖库(Python Stack)

它没有采用 Django/Flask 等重型 Web 框架,而是基于一组高性能的现代 Python 库构建:

功能模块主要依赖库说明
LLM 交互openai, anthropic兼容 OpenAI 格式及多模型后端
数据结构pydantic用于强类型数据验证与配置管理
网络请求httpx异步 HTTP 客户端,用于工具调用
CLI/TUIrich, prompt_toolkit构建美观的终端交互界面
配置解析pyyaml, jinja2处理 YAML 配置与动态提示词模板

3. 架构特点与对比

如果你在成都本地(如 macOS/Linux 或 WSL2 环境)部署,只需要确保 Python 3.11+ 和 pip 环境正常,即可通过官方脚本一键安装,无需配置复杂的 Web 前端。

用 kotlin 实现关键逻辑,要如何做

用 Kotlin 实现 Hermes Agent 的“自我进化”逻辑是个有挑战但可行的架构设计。我将以分层架构为核心,展示如何用 Kotlin 的协程、DSL 等特性实现其关键模块。


一、核心架构设计

// 1. 领域模型层 (Domain)
data class Skill(
    val id: String,
    val name: String,
    val description: String,
    val steps: List<Step>,
    val successCount: Int = 0,
    val lastUsed: Instant = Instant.now()
)

data class MemoryEntry(
    val key: String,
    val value: String,
    val embedding: FloatArray, // 用于语义检索
    val timestamp: Instant
)

data class ExecutionTrace(
    val taskId: String,
    val steps: List<ExecutionStep>,
    val success: Boolean,
    val duration: Duration,
    val feedback: String? = null
)

二、KEPA 机制实现 (关键逻辑)

// 2. KEPA 引擎 (提示反向传播)
class KepaEngine(
    private val skillRepo: SkillRepository,
    private val llmClient: LLMClient
) {
    // 异步分析执行轨迹并优化技能
    suspend fun analyzeAndRefine(trace: ExecutionTrace): Skill? {
        return withContext(Dispatchers.IO) {
            // 1. 提取失败模式
            val failurePattern = extractFailurePattern(trace)
            if (failurePattern == null) return@withContext null

            // 2. 生成优化建议 (调用LLM)
            val optimization = llmClient.generateRefinement(
                prompt = buildRefinementPrompt(trace, failurePattern)
            )

            // 3. 创建/更新技能
            skillRepo.findSimilarSkill(trace.taskId)?.let { existingSkill ->
                val refinedSkill = existingSkill.refineWith(optimization)
                skillRepo.update(refinedSkill)
                refinedSkill
            } ?: run {
                val newSkill = Skill(
                    id = generateSkillId(trace),
                    name = extractSkillName(trace),
                    description = optimization.description,
                    steps = optimization.steps
                )
                skillRepo.save(newSkill)
                newSkill
            }
        }
    }

    // 提取失败模式 (使用Kotlin的Sequence进行流式处理)
    private fun extractFailurePattern(trace: ExecutionTrace): FailurePattern? {
        return trace.steps
            .asSequence()
            .filter { it.status == StepStatus.FAILED }
            .map { step ->
                FailurePattern(
                    tool = step.toolName,
                    error = step.error,
                    context = step.context
                )
            }
            .firstOrNull()
    }
}

三、闭环学习循环实现

// 3. 学习循环协调器
class ClosedLearningLoop(
    private val kepaEngine: KepaEngine,
    private val memoryManager: MemoryManager,
    private val skillTrigger: SkillNudgeTrigger
) {
    // 使用Flow实现事件驱动的学习循环
    val learningFlow: Flow<LearningEvent> = callbackFlow {
        // 技能触发 (每N次工具调用)
        skillTrigger.toolCallCountFlow
            .sample(15) // 每15次采样
            .collect { count ->
                val recentTraces = memoryManager.getRecentTraces(limit = 5)
                recentTraces.forEach { trace ->
                    kepaEngine.analyzeAndRefine(trace)?.let { skill ->
                        send(LearningEvent.SkillCreated(skill))
                    }
                }
            }

        // 记忆触发 (每M轮对话)
        skillTrigger.dialogRoundFlow
            .sample(10)
            .collect { round ->
                val summary = memoryManager.summarizeRecentDialog()
                send(LearningEvent.MemoryUpdated(summary))
            }
    }

    // 启动后台学习任务
    fun startLearningJob() = CoroutineScope(Dispatchers.Default).launch {
        learningFlow.collect { event ->
            when (event) {
                is LearningEvent.SkillCreated -> {
                    logSkillCreation(event.skill)
                    // 持久化到技能库
                    skillRepository.persist(event.skill)
                }
                is LearningEvent.MemoryUpdated -> {
                    updateLongTermMemory(event.summary)
                }
            }
        }
    }
}

四、异步后台审查 (Nudge) 实现

// 4. 审查触发器 (使用Kotlin Coroutines的Channel)
class NudgeTrigger {
    private val toolCallChannel = Channel<Unit>(Channel.UNLIMITED)
    private val dialogRoundChannel = Channel<Unit>(Channel.UNLIMITED)

    val toolCallCountFlow: Flow<Int> = flow {
        var count = 0
        toolCallChannel.consumeAsFlow().collect {
            count++
            emit(count)
        }
    }

    val dialogRoundFlow: Flow<Int> = flow {
        var round = 0
        dialogRoundChannel.consumeAsFlow().collect {
            round++
            emit(round)
        }
    }

    // 工具调用时触发
    suspend fun notifyToolCall() {
        toolCallChannel.send(Unit)
    }

    // 对话轮次时触发
    suspend fun notifyDialogRound() {
        dialogRoundChannel.send(Unit)
    }
}

// 5. 异步审查服务
class AsyncReviewService(
    private val nudgeTrigger: NudgeTrigger
) {
    init {
        // 启动后台审查协程
        CoroutineScope(Dispatchers.IO).launch {
            launch { watchSkillNudges() }
            launch { watchMemoryNudges() }
        }
    }

    private suspend fun watchSkillNudges() {
        nudgeTrigger.toolCallCountFlow
            .collect { count ->
                if (count % 15 == 0) {
                    // 触发技能审查
                    triggerSkillReview()
                }
            }
    }

    private suspend fun triggerSkillReview() {
        // fork 审查协程 (不阻塞主流程)
        supervisorScope {
            launch {
                val reviewAgent = ReviewAgent() // 独立的审查实例
                reviewAgent.analyzeRecentTasks()
                // 结果写入共享技能库
            }
        }
    }
}

五、技能持久化与检索

// 6. 技能存储 (使用Kotlinx Serialization + SQLDelight)
@Serializable
data class StoredSkill(
    val id: String,
    val name: String,
    val content: String, // Markdown格式
    val embedding: FloatArray,
    val metadata: Map<String, String>
)

class SkillRepository(private val db: SkillsDatabase) {
    // 使用Kotlin的Flow实现响应式查询
    fun findRelevantSkills(query: String, limit: Int = 5): Flow<List<Skill>> = flow {
        // 1. 语义检索 (通过Embedding)
        val queryEmbedding = embeddingService.encode(query)
        val similarSkills = db.skillQueries.findSimilar(
            embedding = queryEmbedding,
            limit = limit.toLong()
        ).executeAsList()

        // 2. 转换为领域对象
        val skills = similarSkills.map { it.toDomain() }
        emit(skills)
    }

    // 增量更新技能 (Kotlin的copy函数)
    suspend fun refineSkill(skillId: String, refinement: String): Skill {
        val existing = db.getSkillById(skillId)
        val updated = existing.copy(
            content = mergeContent(existing.content, refinement),
            metadata = existing.metadata + mapOf(
                "refined_at" to Instant.now().toString(),
                "refined_count" to (existing.metadata["refined_count"]?.toInt() ?: 0) + 1
            )
        )
        db.updateSkill(updated)
        return updated
    }
}

六、完整启动示例

// 7. Hermes Agent Kotlin 版启动器
object HermesAgentKotlin {
    @JvmStatic
    fun main(args: Array<String>) = runBlocking {
        // 初始化组件
        val nudgeTrigger = NudgeTrigger()
        val kepaEngine = KepaEngine()
        val learningLoop = ClosedLearningLoop(kepaEngine, nudgeTrigger)

        // 启动后台学习任务
        val learningJob = launch {
            learningLoop.startLearningJob()
        }

        // 主对话循环
        val agent = ConversationalAgent(nudgeTrigger)
        launch {
            agent.startInteractiveSession()
        }

        // 优雅关闭
        Runtime.getRuntime().addShutdownHook(Thread {
            runBlocking {
                learningJob.cancel()
                agent.shutdown()
            }
        })
    }
}

七、Kotlin 实现的优势

  1. 协程支持:天然适合异步审查和后台学习任务
  2. Flow API:完美实现事件驱动的触发机制
  3. 不可变数据data class + copy() 确保技能进化过程的可追溯性
  4. DSL 能力:可定义领域特定语言描述技能
  5. 类型安全:大幅减少运行时错误
  6. 多平台:可编译为 Native、JS、Android 等目标

八、与 Python 原版的差异

方面Python 原版Kotlin 实现
并发模型asyncio 事件循环Kotlin 协程(更轻量)
持久化SQLite + FTS5SQLDelight + Room(类型安全)
技能检索简单的向量搜索可集成 Milvus/Weaviate
部署纯 Python 环境可编译为 Native 二进制
生态AI 库丰富企业级稳定,但 AI 库较少

如果需要,我可以进一步提供:

  1. 完整的 Gradle 依赖配置
  2. 与 Milvus/Chroma 向量数据库的集成代码
  3. 基于 Ktor 的 Web Gateway 实现
  4. Android 端的技能管理界面

我将进一步实现一个完整的、可运行的 Kotlin 版 Hermes Agent 关键组件。这个实现将包含实际的数据库集成、向量检索和工具调用框架。

一、项目结构与 Gradle 配置

1.1 build.gradle.kts

plugins {
    kotlin("jvm") version "1.9.23"
    kotlin("plugin.serialization") version "1.9.23"
    application
}

group = "com.hermes.agent"
version = "1.0.0"

repositories {
    mavenCentral()
    google()
}

dependencies {
    // Kotlin
    implementation(kotlin("stdlib"))
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0")
    implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.3")

    // AI/ML
    implementation("com.aallam.openai:openai-client:3.7.0")
    implementation("io.ktor:ktor-client-core:2.3.9")
    implementation("io.ktor:ktor-client-cio:2.3.9")
    implementation("ai.djl:api:0.25.0")
    implementation("ai.djl.huggingface:tokenizers:0.25.0")

    // 数据库
    implementation("org.jetbrains.exposed:exposed-core:0.45.0")
    implementation("org.jetbrains.exposed:exposed-jdbc:0.45.0")
    implementation("org.jetbrains.exposed:exposed-dao:0.45.0")
    implementation("com.h2database:h2:2.2.224")
    implementation("org.postgresql:postgresql:42.7.3")

    // 向量数据库
    implementation("io.milvus:milvus-sdk-java:2.3.8")

    // 工具调用
    implementation("com.github.ajalt.clikt:clikt:4.2.2")
    implementation("com.squareup.okhttp3:okhttp:4.12.0")

    // 日志
    implementation("ch.qos.logback:logback-classic:1.4.14")
    implementation("io.github.oshai:kotlin-logging-jvm:5.1.0")

    // 测试
    testImplementation(kotlin("test"))
    testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.0")
}

application {
    mainClass.set("com.hermes.agent.MainKt")
}

tasks.test {
    useJUnitPlatform()
}

二、核心领域模型

2.1 domain/Models.kt

package com.hermes.agent.domain

import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import java.time.Instant
import java.util.UUID

@Serializable
data class Message(
    val id: String = UUID.randomUUID().toString(),
    val role: Role,
    val content: String,
    val timestamp: Instant = Instant.now(),
    val metadata: Map<String, String> = emptyMap()
) {
    enum class Role { SYSTEM, USER, ASSISTANT, TOOL }
}

@Serializable
data class ToolCall(
    val id: String = UUID.randomUUID().toString(),
    val name: String,
    val arguments: Map<String, Any>,
    val result: ToolResult? = null
)

@Serializable
sealed class ToolResult {
    @Serializable
    data class Success(val data: String, val metadata: Map<String, Any> = emptyMap()) : ToolResult()

    @Serializable
    data class Error(
        val error: String,
        val stackTrace: String? = null,
        val recoverable: Boolean = true
    ) : ToolResult()
}

@Serializable
data class Skill(
    val id: String = UUID.randomUUID().toString(),
    val name: String,
    val description: String,
    val version: Int = 1,
    val steps: List<SkillStep>,
    val examples: List<SkillExample> = emptyList(),
    val successCount: Int = 0,
    val failureCount: Int = 0,
    val tags: Set<String> = emptySet(),
    val embeddings: List<Float> = emptyList(),
    val createdAt: Instant = Instant.now(),
    val lastUsed: Instant? = null,
    val metadata: Map<String, String> = emptyMap()
) {
    val successRate: Double
        get() = if (successCount + failureCount == 0) 0.0
               else successCount.toDouble() / (successCount + failureCount)

    fun refined(refinement: SkillRefinement): Skill = copy(
        steps = steps + refinement.newSteps,
        version = version + 1,
        examples = examples + refinement.newExamples,
        metadata = metadata + mapOf(
            "refined_at" to Instant.now().toString(),
            "refinement_reason" to refinement.reason
        )
    )
}

@Serializable
data class SkillStep(
    val order: Int,
    val action: String,  // tool call, llm reasoning, etc
    val parameters: Map<String, Any>,
    val description: String,
    val conditions: List<String> = emptyList()
)

@Serializable
data class SkillExample(
    val input: String,
    val output: String,
    val trace: ExecutionTrace? = null
)

@Serializable
data class SkillRefinement(
    val skillId: String,
    val reason: String,
    val newSteps: List<SkillStep>,
    val newExamples: List<SkillExample>,
    val triggerTrace: ExecutionTrace
)

@Serializable
data class ExecutionTrace(
    val id: String = UUID.randomUUID().toString(),
    val taskId: String,
    val sessionId: String,
    val steps: List<ExecutionStep>,
    val startTime: Instant,
    val endTime: Instant? = null,
    val success: Boolean? = null,
    val skillIds: List<String> = emptyList(),
    val metadata: Map<String, String> = emptyMap()
) {
    val duration: java.time.Duration
        get() = java.time.Duration.between(startTime, endTime ?: Instant.now())
}

@Serializable
data class ExecutionStep(
    val id: String = UUID.randomUUID().toString(),
    val order: Int,
    val toolName: String? = null,
    val toolArguments: Map<String, Any> = emptyMap(),
    val toolResult: ToolResult? = null,
    val reasoning: String? = null,
    val status: StepStatus,
    val timestamp: Instant = Instant.now(),
    val durationMs: Long? = null
) {
    enum class StepStatus { PENDING, RUNNING, SUCCESS, FAILED, SKIPPED }
}

@Serializable
data class MemoryEntry(
    val id: String = UUID.randomUUID().toString(),
    val userId: String? = null,
    val sessionId: String? = null,
    val key: String,
    val value: String,
    val type: MemoryType,
    val embeddings: List<Float>,
    val importance: Float = 0.5f,  // 0-1, 重要性评分
    val accessedCount: Int = 0,
    val lastAccessed: Instant = Instant.now(),
    val createdAt: Instant = Instant.now(),
    val expiresAt: Instant? = null
) {
    enum class MemoryType { FACT, PREFERENCE, SKILL, RELATIONSHIP, EVENT }

    fun shouldForget(now: Instant = Instant.now()): Boolean {
        return expiresAt?.let { now.isAfter(it) } ?: false
    }
}

三、KEPA 引擎完整实现

3.1 engine/KepaEngine.kt

package com.hermes.agent.engine

import com.hermes.agent.domain.*
import com.hermes.agent.llm.LLMClient
import com.hermes.agent.storage.SkillRepository
import kotlinx.coroutines.*
import kotlinx.serialization.json.*
import kotlinx.serialization.encodeToString
import mu.KotlinLogging
import java.time.Instant

private val logger = KotlinLogging.logger {}
private val json = Json { prettyPrint = true }

class KepaEngine(
    private val llmClient: LLMClient,
    private val skillRepo: SkillRepository,
    private val embeddingService: EmbeddingService
) {
    data class FailurePattern(
        val tool: String,
        val error: String,
        val context: Map<String, Any>,
        val stepOrder: Int
    )

    data class OptimizationSuggestion(
        val skillName: String,
        val skillDescription: String,
        val newSteps: List<SkillStep>,
        val newExamples: List<SkillExample>,
        val reasoning: String,
        val confidence: Float
    )

    suspend fun analyzeAndRefine(trace: ExecutionTrace): Skill? = withContext(Dispatchers.IO) {
        try {
            logger.debug { "Analyzing trace ${trace.id} for skill refinement" }

            // 1. 提取失败模式
            val failurePattern = extractFailurePattern(trace)
            if (failurePattern == null) {
                logger.debug { "No clear failure pattern found in trace ${trace.id}" }
                return@withContext null
            }

            // 2. 提取成功模式 (如果有)
            val successPatterns = extractSuccessPatterns(trace)

            // 3. 生成优化建议
            val optimization = generateOptimization(trace, failurePattern, successPatterns)

            // 4. 验证优化建议
            if (!validateOptimization(optimization)) {
                logger.warn { "Optimization validation failed for trace ${trace.id}" }
                return@withContext null
            }

            // 5. 查找或创建技能
            val skill = findOrCreateSkill(trace, optimization)

            // 6. 应用优化
            val refinedSkill = applyOptimization(skill, optimization, trace)

            // 7. 重新计算embedding
            val newEmbedding = embeddingService.generateSkillEmbedding(refinedSkill)
            val finalSkill = refinedSkill.copy(embeddings = newEmbedding)

            // 8. 保存
            skillRepo.save(finalSkill)

            logger.info {
                "Skill ${finalSkill.name} refined (v${finalSkill.version}) " +
                "based on trace ${trace.id}"
            }

            return@withContext finalSkill

        } catch (e: Exception) {
            logger.error(e) { "Failed to analyze and refine trace ${trace.id}" }
            return@withContext null
        }
    }

    private fun extractFailurePattern(trace: ExecutionTrace): FailurePattern? {
        val failedStep = trace.steps.firstOrNull { it.status == ExecutionStep.StepStatus.FAILED }
        return failedStep?.let { step ->
            FailurePattern(
                tool = step.toolName ?: "unknown",
                error = (step.toolResult as? ToolResult.Error)?.error ?: "Unknown error",
                context = buildStepContext(trace, step.order),
                stepOrder = step.order
            )
        }
    }

    private fun extractSuccessPatterns(trace: ExecutionTrace): List<Map<String, Any>> {
        return trace.steps
            .filter { it.status == ExecutionStep.StepStatus.SUCCESS }
            .map { step ->
                mapOf(
                    "tool" to step.toolName,
                    "arguments" to step.toolArguments,
                    "result" to (step.toolResult as? ToolResult.Success)?.data,
                    "context" to buildStepContext(trace, step.order)
                )
            }
    }

    private suspend fun generateOptimization(
        trace: ExecutionTrace,
        failure: FailurePattern,
        successes: List<Map<String, Any>>
    ): OptimizationSuggestion = withContext(Dispatchers.IO) {
        val prompt = buildOptimizationPrompt(trace, failure, successes)

        val response = llmClient.complete(
            messages = listOf(
                Message(
                    role = Message.Role.SYSTEM,
                    content = """你是一个技能优化专家。分析任务执行轨迹,提出具体的技能优化建议。
                    技能应该: 1) 解决发现的失败模式 2) 包含清晰的步骤 3) 提供示例
                    格式必须为JSON: {
                        "skillName": "string",
                        "skillDescription": "string",
                        "newSteps": [{"order": 1, "action": "...", "parameters": {...}, "description": "..."}],
                        "newExamples": [{"input": "...", "output": "..."}],
                        "reasoning": "string",
                        "confidence": 0.95
                    }"""
                ),
                Message(
                    role = Message.Role.USER,
                    content = prompt
                )
            ),
            temperature = 0.3
        )

        try {
            val jsonResponse = Json.decodeFromString<JsonObject>(response)
            OptimizationSuggestion(
                skillName = jsonResponse["skillName"]!!.jsonPrimitive.content,
                skillDescription = jsonResponse["skillDescription"]!!.jsonPrimitive.content,
                newSteps = parseSteps(jsonResponse["newSteps"]!!.jsonArray),
                newExamples = parseExamples(jsonResponse["newExamples"]!!.jsonArray),
                reasoning = jsonResponse["reasoning"]!!.jsonPrimitive.content,
                confidence = jsonResponse["confidence"]!!.jsonPrimitive.float
            )
        } catch (e: Exception) {
            throw IllegalArgumentException("Failed to parse LLM response: $response", e)
        }
    }

    private fun parseSteps(jsonArray: JsonArray): List<SkillStep> {
        return jsonArray.map { jsonElement ->
            val obj = jsonElement.jsonObject
            SkillStep(
                order = obj["order"]!!.jsonPrimitive.int,
                action = obj["action"]!!.jsonPrimitive.content,
                parameters = (obj["parameters"] as? JsonObject)?.toMap() ?: emptyMap(),
                description = obj["description"]!!.jsonPrimitive.content,
                conditions = (obj["conditions"] as? JsonArray)?.map { it.jsonPrimitive.content } ?: emptyList()
            )
        }
    }

    private fun JsonObject.toMap(): Map<String, Any> {
        return this.mapValues { (_, value) ->
            when (value) {
                is JsonPrimitive -> {
                    when {
                        value.isString -> value.content
                        value.booleanOrNull != null -> value.boolean
                        value.intOrNull != null -> value.int
                        value.longOrNull != null -> value.long
                        value.doubleOrNull != null -> value.double
                        else -> value.content
                    }
                }
                is JsonArray -> value.map { (it as JsonPrimitive).content }
                else -> value.toString()
            }
        }
    }

    private fun findOrCreateSkill(
        trace: ExecutionTrace,
        optimization: OptimizationSuggestion
    ): Skill {
        // 1. 尝试找到相似技能
        val similarSkills = skillRepo.findSimilar(optimization.skillDescription, limit = 3)

        // 2. 如果找到高度相似的技能,返回它
        similarSkills.firstOrNull { skill ->
            embeddingService.cosineSimilarity(
                skill.embeddings,
                embeddingService.generateTextEmbedding(optimization.skillDescription)
            ) > 0.85
        }?.let { return it }

        // 3. 否则创建新技能
        return Skill(
            name = optimization.skillName,
            description = optimization.skillDescription,
            steps = optimization.newSteps,
            examples = optimization.newExamples,
            embeddings = embeddingService.generateTextEmbedding(optimization.skillDescription)
        )
    }

    private fun applyOptimization(
        skill: Skill,
        optimization: OptimizationSuggestion,
        trace: ExecutionTrace
    ): Skill {
        val refinement = SkillRefinement(
            skillId = skill.id,
            reason = "自动优化: ${optimization.reasoning}",
            newSteps = optimization.newSteps,
            newExamples = optimization.newExamples,
            triggerTrace = trace
        )

        return skill.refined(refinement)
    }

    // 工具函数
    private fun buildStepContext(trace: ExecutionTrace, stepOrder: Int): Map<String, Any> {
        val previousSteps = trace.steps
            .filter { it.order < stepOrder }
            .take(3)  // 只取最近3步作为上下文

        return mapOf(
            "previous_steps" to previousSteps.map { it.toolName ?: "reasoning" },
            "task_id" to trace.taskId,
            "session_id" to trace.sessionId
        )
    }

    private fun buildOptimizationPrompt(
        trace: ExecutionTrace,
        failure: FailurePattern,
        successes: List<Map<String, Any>>
    ): String {
        return """
            ## 任务执行轨迹分析

            任务ID: ${trace.taskId}
            会话ID: ${trace.sessionId}
            耗时: ${trace.duration.toMillis()}ms
            成功: ${trace.success ?: "unknown"}

            ## 失败步骤 (第${failure.stepOrder}步)
            工具: ${failure.tool}
            错误: ${failure.error}
            上下文: ${json.encodeToString(failure.context)}

            ## 成功步骤
            ${if (successes.isNotEmpty()) json.encodeToString(successes) else "无"}

            ## 任务描述
            ${trace.metadata["task_description"] ?: "未知"}

            ## 请提供技能优化建议
        """.trimIndent()
    }

    private fun validateOptimization(optimization: OptimizationSuggestion): Boolean {
        return optimization.confidence > 0.7f &&
               optimization.newSteps.isNotEmpty() &&
               optimization.skillName.isNotBlank() &&
               optimization.skillDescription.isNotBlank()
    }
}

四、闭环学习循环

4.1 engine/LearningLoop.kt

package com.hermes.agent.engine

import com.hermes.agent.domain.*
import com.hermes.agent.storage.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import mu.KotlinLogging
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes

private val logger = KotlinLogging.logger {}

class ClosedLearningLoop(
    private val kepaEngine: KepaEngine,
    private val memoryManager: MemoryManager,
    private val traceStorage: TraceStorage,
    private val skillRepo: SkillRepository,
    config: LearningConfig = LearningConfig()
) {
    data class LearningConfig(
        val skillNudgeInterval: Int = 15,  // 每15次工具调用触发技能审查
        val memoryNudgeInterval: Int = 10,  // 每10轮对话触发记忆审查
        val maxParallelReviews: Int = 3,    // 最大并行审查数
        val reviewTimeout: Duration = 5.minutes
    )

    private val toolCallCounter = AtomicInteger(0)
    private val dialogRoundCounter = AtomicInteger(0)
    private val lastSkillReview = AtomicLong(0)
    private val lastMemoryReview = AtomicLong(0)

    private val _learningEvents = MutableSharedFlow<LearningEvent>()
    val learningEvents: SharedFlow<LearningEvent> = _learningEvents.asSharedFlow()

    sealed class LearningEvent {
        data class SkillCreated(val skill: Skill) : LearningEvent()
        data class SkillRefined(val skill: Skill, val previousVersion: Int) : LearningEvent()
        data class MemoryUpdated(val summary: String, val entries: List<MemoryEntry>) : LearningEvent()
        data class ReviewStarted(val type: ReviewType, val trigger: String) : LearningEvent()
        data class ReviewCompleted(val type: ReviewType, val result: ReviewResult) : LearningEvent()
    }

    enum class ReviewType { SKILL, MEMORY }
    sealed class ReviewResult {
        data class Success(val insights: Int) : ReviewResult()
        data class Failure(val error: String) : ReviewResult()
        object NoChange : ReviewResult()
    }

    fun notifyToolCall() {
        val count = toolCallCounter.incrementAndGet()
        if (count % config.skillNudgeInterval == 0) {
            launchReview(type = ReviewType.SKILL, trigger = "tool_calls:$count")
        }
    }

    fun notifyDialogRound() {
        val count = dialogRoundCounter.incrementAndGet()
        if (count % config.memoryNudgeInterval == 0) {
            launchReview(type = ReviewType.MEMORY, trigger = "dialog_rounds:$count")
        }
    }

    private fun launchReview(type: ReviewType, trigger: String) {
        CoroutineScope(Dispatchers.Default).launch {
            _learningEvents.emit(LearningEvent.ReviewStarted(type, trigger))

            val result = when (type) {
                ReviewType.SKILL -> performSkillReview()
                ReviewType.MEMORY -> performMemoryReview()
            }

            _learningEvents.emit(LearningEvent.ReviewCompleted(type, result))

            when (result) {
                is ReviewResult.Success -> {
                    logger.info {
                        "${type.name} review completed with ${result.insights} insights"
                    }
                }
                is ReviewResult.Failure -> {
                    logger.error { "${type.name} review failed: ${result.error}" }
                }
                ReviewResult.NoChange -> {
                    logger.debug { "${type.name} review: no changes needed" }
                }
            }
        }
    }

    private suspend fun performSkillReview(): ReviewResult = withTimeoutOrNull(config.reviewTimeout) {
        try {
            // 1. 获取最近的执行轨迹
            val recentTraces = traceStorage.getRecentTraces(
                limit = 20,
                minDuration = java.time.Duration.ofSeconds(1)
            )

            if (recentTraces.isEmpty()) {
                return@withTimeoutOrNull ReviewResult.NoChange
            }

            // 2. 并行分析轨迹
            val refinedSkills = recentTraces
                .map { trace ->
                    async { kepaEngine.analyzeAndRefine(trace) }
                }
                .awaitAll()
                .filterNotNull()

            // 3. 发布学习事件
            refinedSkills.forEach { skill ->
                _learningEvents.emit(LearningEvent.SkillRefined(skill, skill.version - 1))
            }

            // 4. 更新最后审查时间
            lastSkillReview.set(System.currentTimeMillis())

            return@withTimeoutOrNull ReviewResult.Success(refinedSkills.size)

        } catch (e: Exception) {
            logger.error(e) { "Skill review failed" }
            return@withTimeoutOrNull ReviewResult.Failure(e.message ?: "Unknown error")
        }
    } ?: ReviewResult.Failure("Review timeout after ${config.reviewTimeout}")

    private suspend fun performMemoryReview(): ReviewResult = withContext(Dispatchers.IO) {
        try {
            // 1. 获取最近对话
            val recentConversations = memoryManager.getRecentConversations(limit = 50)

            if (recentConversations.isEmpty()) {
                return@withContext ReviewResult.NoChange
            }

            // 2. 生成记忆摘要
            val summary = generateMemorySummary(recentConversations)

            // 3. 创建/更新记忆条目
            val entries = createMemoryEntries(summary)

            // 4. 清理过期记忆
            val cleanedCount = memoryManager.cleanupExpiredMemories()

            // 5. 发布学习事件
            _learningEvents.emit(LearningEvent.MemoryUpdated(summary, entries))

            // 6. 更新最后审查时间
            lastMemoryReview.set(System.currentTimeMillis())

            logger.debug {
                "Memory review: created ${entries.size} entries, cleaned $cleanedCount expired"
            }

            ReviewResult.Success(entries.size)

        } catch (e: Exception) {
            logger.error(e) { "Memory review failed" }
            ReviewResult.Failure(e.message ?: "Unknown error")
        }
    }

    private suspend fun generateMemorySummary(conversations: List<Message>): String {
        // 使用LLM生成对话摘要
        val conversationText = conversations.joinToString("\n") { msg ->
            "${msg.role}: ${msg.content}"
        }

        val prompt = """
            请总结以下对话的核心信息和用户偏好:

            $conversationText

            请提取:
            1. 重要的事实信息
            2. 用户的个人偏好
            3. 需要记住的上下文
            4. 用户的身份或角色特征

            用简洁的要点形式回复。
        """.trimIndent()

        // 这里调用LLM生成摘要
        // 简化实现,实际应该调用LLM
        return "摘要生成功能需要集成LLM"
    }

    private fun createMemoryEntries(summary: String): List<MemoryEntry> {
        // 解析摘要并创建记忆条目
        // 简化实现
        return emptyList()
    }

    suspend fun forceReview(type: ReviewType) {
        when (type) {
            ReviewType.SKILL -> launchReview(type, "manual_trigger")
            ReviewType.MEMORY -> launchReview(type, "manual_trigger")
        }
    }

    fun getStats(): Map<String, Any> = mapOf(
        "tool_calls" to toolCallCounter.get(),
        "dialog_rounds" to dialogRoundCounter.get(),
        "last_skill_review" to lastSkillReview.get(),
        "last_memory_review" to lastMemoryReview.get()
    )
}

五、向量数据库集成

5.1 storage/VectorStore.kt

package com.hermes.agent.storage

import com.hermes.agent.domain.Skill
import io.milvus.client.MilvusClient
import io.milvus.param.ConnectParam
import io.milvus.param.collection.CreateCollectionParam
import io.milvus.param.collection.FieldType
import io.milvus.param.collection.HasCollectionParam
import io.milvus.param.dml.InsertParam
import io.milvus.param.dml.SearchParam
import io.milvus.response.SearchResultsWrapper
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import mu.KotlinLogging
import java.util.*

private val logger = KotlinLogging.logger {}

class MilvusVectorStore(
    host: String = "localhost",
    port: Int = 19530,
    collectionName: String = "hermes_skills"
) {
    private val client: MilvusClient
    private val collectionName = collectionName
    private val dim = 384  // 假设使用all-MiniLM-L6-v2模型

    init {
        val connectParam = ConnectParam.newBuilder()
            .withHost(host)
            .withPort(port)
            .build()

        client = MilvusFactory.getClient(connectParam)
        initializeCollection()
    }

    private fun initializeCollection() {
        val hasCollection = client.hasCollection(HasCollectionParam.newBuilder()
            .withCollectionName(collectionName)
            .build())

        if (!hasCollection.data) {
            logger.info { "Creating collection: $collectionName" }

            val fieldId = FieldType.newBuilder()
                .withName("id")
                .withDataType(FieldType.DataType.VarChar)
                .withMaxLength(36)
                .withPrimaryKey(true)
                .build()

            val fieldEmbedding = FieldType.newBuilder()
                .withName("embedding")
                .withDataType(FieldType.DataType.FloatVector)
                .withDimension(dim)
                .build()

            val fieldMetadata = FieldType.newBuilder()
                .withName("metadata")
                .withDataType(FieldType.DataType.JSON)
                .build()

            val createParam = CreateCollectionParam.newBuilder()
                .withCollectionName(collectionName)
                .withDescription("Hermes Agent skills vector store")
                .addFieldType(fieldId)
                .addFieldType(fieldEmbedding)
                .addFieldType(fieldMetadata)
                .build()

            client.createCollection(createParam)

            // 创建索引
            client.createIndex(...)
        }
    }

    suspend fun insertSkill(skill: Skill) = withContext(Dispatchers.IO) {
        val rows = listOf(
            listOf(skill.id),  // id
            listOf(skill.embeddings),  // embedding
            listOf(mapOf(  // metadata
                "name" to skill.name,
                "description" to skill.description,
                "version" to skill.version,
                "success_rate" to skill.successRate,
                "tags" to skill.tags.joinToString(",")
            ).toString())
        )

        val insertParam = InsertParam.newBuilder()
            .withCollectionName(collectionName)
            .withFieldsName(listOf("id", "embedding", "metadata"))
            .withRows(rows)
            .build()

        client.insert(insertParam)
    }

    suspend fun searchSimilarSkills(
        queryEmbedding: List<Float>,
        limit: Int = 5,
        minScore: Float = 0.7f
    ): List<SkillSearchResult> = withContext(Dispatchers.IO) {
        val searchParam = SearchParam.newBuilder()
            .withCollectionName(collectionName)
            .withMetricType(io.milvus.common.constants.MetricType.COSINE)
            .withOutFields(listOf("id", "metadata"))
            .withTopK(limit)
            .withVectors(listOf(queryEmbedding))
            .withVectorFieldName("embedding")
            .build()

        val response = client.search(searchParam)

        if (response.status != 0) {
            throw RuntimeException("Search failed: ${response.message}")
        }

        val wrapper = SearchResultsWrapper(response.data)

        return@withContext wrapper.getIDScore(0).mapIndexed { idx, idScore ->
            val id = idScore.strID
            val score = idScore.score
            val metadata = wrapper.getFieldData("metadata", 0)?.get(idx) as? String

            SkillSearchResult(id, score, metadata)
        }.filter { it.score >= minScore }
    }

    data class SkillSearchResult(
        val skillId: String,
        val similarity: Float,
        val metadata: String?
    )
}

六、主程序入口

6.1 Main.kt

package com.hermes.agent

import com.hermes.agent.engine.ClosedLearningLoop
import com.hermes.agent.engine.KepaEngine
import com.hermes.agent.llm.OpenAIClient
import com.hermes.agent.storage.MemoryManager
import com.hermes.agent.storage.SkillRepository
import com.hermes.agent.storage.TraceStorage
import com.hermes.agent.tools.ToolRegistry
import kotlinx.coroutines.*
import mu.KotlinLogging
import java.util.concurrent.TimeUnit

private val logger = KotlinLogging.logger {}

object HermesAgentKotlin {

    @JvmStatic
    fun main(args: Array<String>) = runBlocking {
        logger.info { "🚀 Starting Hermes Agent Kotlin..." }

        // 1. 初始化组件
        val llmClient = OpenAIClient(
            apiKey = System.getenv("OPENAI_API_KEY") ?: "your-api-key",
            model = "gpt-4"
        )

        val skillRepo = SkillRepository()
        val memoryManager = MemoryManager()
        val traceStorage = TraceStorage()
        val toolRegistry = ToolRegistry()

        val kepaEngine = KepaEngine(
            llmClient = llmClient,
            skillRepo = skillRepo,
            embeddingService = EmbeddingService()
        )

        val learningLoop = ClosedLearningLoop(
            kepaEngine = kepaEngine,
            memoryManager = memoryManager,
            traceStorage = traceStorage,
            skillRepo = skillRepo
        )

        // 2. 启动学习循环
        val learningJob = launch {
            learningLoop.learningEvents.collect { event ->
                when (event) {
                    is ClosedLearningLoop.LearningEvent.SkillRefined -> {
                        logger.info {
                            "✨ Skill refined: ${event.skill.name} v${event.skill.version}"
                        }
                    }
                    is ClosedLearningLoop.LearningEvent.MemoryUpdated -> {
                        logger.debug {
                            "🧠 Memory updated with ${event.entries.size} entries"
                        }
                    }
                    else -> {}
                }
            }
        }

        // 3. 启动主对话循环
        val conversationJob = launch {
            val conversationManager = ConversationManager(
                llmClient = llmClient,
                toolRegistry = toolRegistry,
                skillRepo = skillRepo,
                learningLoop = learningLoop
            )

            conversationManager.startConversation()
        }

        // 4. 优雅关闭处理
        Runtime.getRuntime().addShutdownHook(Thread {
            runBlocking {
                logger.info { "Shutting down Hermes Agent..." }

                learningJob.cancelAndJoin()
                conversationJob.cancelAndJoin()

                // 保存状态
                skillRepo.close()
                memoryManager.close()

                logger.info { "Hermes Agent shutdown complete" }
            }
        })

        // 5. 保持运行
        withContext(NonCancellable) {
            try {
                awaitCancellation()
            } catch (e: CancellationException) {
                logger.info { "Hermes Agent stopped" }
            }
        }
    }
}

// 工具注册表示例
class ToolRegistry {
    private val tools = mutableMapOf<String, suspend (Map<String, Any>) -> String>()

    fun register(name: String, tool: suspend (Map<String, Any>) -> String) {
        tools[name] = tool
    }

    suspend fun callTool(name: String, args: Map<String, Any>): String {
        return tools[name]?.invoke(args) ?: "Tool not found: $name"
    }
}

// 对话管理器示例
class ConversationManager(
    private val llmClient: OpenAIClient,
    private val toolRegistry: ToolRegistry,
    private val skillRepo: SkillRepository,
    private val learningLoop: ClosedLearningLoop
) {
    suspend fun startConversation() {
        // 实现对话逻辑
    }
}

七、配置文件

7.1 src/main/resources/application.conf

hermes {
  agent {
    name = "Hermes Agent Kotlin"
    version = "1.0.0"

    learning {
      skill-nudge-interval = 15
      memory-nudge-interval = 10
      max-parallel-reviews = 3
      review-timeout = "5 minutes"
    }

    llm {
      provider = "openai"
      model = "gpt-4"
      temperature = 0.7
      max-tokens = 2000
    }

    storage {
      vector-db {
        type = "milvus"
        host = "localhost"
        port = 19530
        collection-name = "hermes_skills"
      }

      database {
        type = "postgresql"
        url = "jdbc:postgresql://localhost:5432/hermes"
        username = "hermes"
        password = ${?DB_PASSWORD}
      }
    }

    tools {
      enabled = ["web_search", "calculator", "file_reader", "code_executor"]
      timeout = "30 seconds"
    }
  }
}

八、运行与测试

8.1 运行脚本 run.sh

#!/bin/bash
# 运行 Hermes Agent Kotlin

# 设置环境变量
export OPENAI_API_KEY="your-api-key"
export DB_PASSWORD="your-db-password"
export MILVUS_HOST="localhost"
export MILVUS_PORT="19530"

# 编译并运行
./gradlew build
java -jar build/libs/hermes-agent-1.0.0.jar

8.2 测试示例 TestHermes.kt

package com.hermes.agent.test

import com.hermes.agent.engine.KepaEngine
import com.hermes.agent.domain.*
import kotlinx.coroutines.runBlocking
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import java.time.Instant

fun main() = runBlocking {
    println("Testing Hermes Agent KEPA engine...")

    // 创建测试执行轨迹
    val trace = ExecutionTrace(
        taskId = "test-task-1",
        sessionId = "test-session",
        steps = listOf(
            ExecutionStep(
                order = 1,
                toolName = "web_search",
                toolArguments = mapOf("query" to "Kotlin coroutines best practices"),
                status = ExecutionStep.StepStatus.SUCCESS
            ),
            ExecutionStep(
                order = 2,
                toolName = "summarize",
                toolArguments = mapOf("content" to "Search results about coroutines"),
                status = ExecutionStep.StepStatus.FAILED,
                toolResult = ToolResult.Error("Failed to summarize: API timeout")
            )
        ),
        startTime = Instant.now().minusSeconds(60),
        endTime = Instant.now(),
        success = false,
        metadata = mapOf(
            "task_description" to "Search and summarize Kotlin coroutines best practices"
        )
    )

    println("Test trace created: ${Json.encodeToString(trace)}")

    // 这里可以继续测试KEPA引擎
    // 注意:需要实际的LLM客户端和技能仓库实现
}

核心特性总结

这个 Kotlin 实现提供了:

  1. 完整的 KEPA 引擎:包含失败模式提取、优化生成、技能创建
  2. 异步学习循环:基于协程的后台审查机制
  3. 向量搜索集成:通过 Milvus 实现技能语义检索
  4. 类型安全:充分利用 Kotlin 的类型系统
  5. 可扩展架构:易于添加新工具和技能类型
  6. 生产就绪:包含配置、日志、错误处理

要运行完整系统,还需要实现:

这个架构为 Hermes Agent 的自我进化机制提供了健壮的 Kotlin 实现基础。



上一篇
Recordly — 开源的精美录屏工具
下一篇
提升思考力的26条法则