本地事件总线

在单体服务或边缘组件中,使用“本地事件总线”可以将核心交易流程与非关键但必要的“旁路逻辑”(如审计、埋点、异步落库等)解耦,同时在需要时以异步方式削峰。本文给出一个工程化的最小可用实现与实践要点。

适用场景

  • 非关键路径的“扩展逻辑”:埋点、通知、审计、缓存刷新、异步刷库等。
  • 局部解耦:不引入外部 MQ 的情况下,让发布方不直接依赖消费者实现。
  • 小成本削峰:在请求线程外异步执行,减少主流程时延。

不适合的场景:需要跨进程/跨服务可靠投递、严格有序性、可回溯性或高可用的强需求(此类应考虑 Kafka/RabbitMQ/事件流平台)。

设计目标

  • 简洁 API:注册、发布、选择同步/异步。
  • 并发安全:注册/发布并发可用。
  • 隔离性:消费者异常不影响发布方主流程。
  • 可观测:最少限度的错误暴露与日志挂接点。

核心实现(线程安全、可选异步)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package eventbus

import (
"context"
"log"
"sync"
)

// 事件处理函数签名:按需自定义参数结构体
type HandlerFunc func(ctx context.Context, arg1, arg2 MyStruct)

type Bus struct {
mu sync.RWMutex
handlers map[string]HandlerFunc
}

func New() *Bus {
return &Bus{handlers: make(map[string]HandlerFunc)}
}

// Register 注册事件处理器;重复注册将覆盖旧处理器
func (b *Bus) Register(name string, h HandlerFunc) {
b.mu.Lock()
b.handlers[name] = h
b.mu.Unlock()
}

// Publish 发布事件;async=true 时在 goroutine 中执行
// 未注册的事件将返回 false 便于上层打点
func (b *Bus) Publish(ctx context.Context, name string, arg1, arg2 MyStruct, async bool) bool {
b.mu.RLock()
h, ok := b.handlers[name]
b.mu.RUnlock()
if !ok {
return false
}

run := func() {
// 防御:隔离消费者 panic,避免影响主流程
defer func() {
if r := recover(); r != nil {
log.Printf("eventbus handler panic: name=%s, recover=%v", name, r)
}
}()
h(ctx, arg1, arg2)
}

if async {
go run()
} else {
run()
}
return true
}

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 初始化总线
var bus = eventbus.New()

// 注册:例如“异步刷盘 MySQL”
bus.Register("async_flush_mysql", func(ctx context.Context, arg1, arg2 MyStruct) {
// 执行刷盘动作
})

// 发布:同步
ok := bus.Publish(ctx, "async_flush_mysql", a1, a2, false)
if !ok {
// 可记录未注册事件的观测日志
}

// 发布:异步
_ = bus.Publish(ctx, "async_flush_mysql", a1, a2, true)

工程实践要点

  • 错误与可观测:
    • 事件未注册时返回值用于统计/告警。
    • 建议在处理器内统一接入日志/指标埋点,并结合 context 传递 traceID。
  • 上下文与超时:
    • 异步处理应尊重 ctx,在耗时操作处检查取消/超时,避免后台 goroutine 漫游。
  • 并发与容量:
    • 如果异步任务可能堆积,增加限流/队列(如带缓冲 channel + worker 池)。
    • 长尾、重试、退避策略应在处理器内自行控制。
  • Panic 隔离:
    • 已在 Publish 中加了 recover,仍建议处理器内做细粒度防御。

与消息队列的对比

  • 本地事件总线:轻量、零依赖、进程内调用,可靠性受制于进程生命周期,不具备跨服务投递保障。
  • 专业 MQ/事件流:具备持久化、回溯、重放、消费组、顺序与扩展性;成本与复杂度更高。

小结

本地事件总线适合在单体或简易服务中对旁路逻辑进行解耦与异步化处理。通过线程安全的注册/发布、panic 隔离与必要的可观测性,即可以极低成本获得显著的工程收益;当需求升级到跨服务可靠投递与大规模扩展时,再自然过渡到专业消息系统。