前段时间业务中出现了一个问题,最后发现是 RabbitMQ 的连接断开导致的,所以我对 RabbitMQ Publisher 进行了改造,增加了自动重连机制。
我们原来在业务中使用 RabbitMQ 时,是以类似demo的最简单的方式来使用的(-_-),类似这样:
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
ch, err := conn.Channel()
err = ch.Publish(...)
结果某天发现 RabbitMQ 连接断开了,导致业务中的消息发送失败,所以简单写了一个包。
功能:
- 支持多个 RabbitMQ 连接
- 每个 Connection 支持多个 Channel
- Connection 和 Channel 支持自动重连
以下是代码:
import (
"errors"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"log"
"time"
)
const (
reconnectInterval = 5 * time.Second // 重连间隔时间
defaultIdeConnectionCount = 5 // 默认的连接数
defaultIdeChannelCountPerCon = 5 // 每个连接的默认通道数
)
// 定义可能遇到的错误
var (
errConClosed = errors.New("Connection is closed.") // 连接已关闭错误
errConNotConnected = errors.New("Connection not connected to a server.") // 连接未连接到服务器错误
errChanNotConnected = errors.New("Channel not connected to a server.") // 通道未连接到服务器错误
)
// MqConf 定义RabbitMQ的配置
type MqConf struct {
Hosts []string // 主机地址列表
Name string // 用户名
Pass string // 密码
Vhost string // 虚拟主机
IdeConnectionCount int // 理想的连接数
IdeChannelCountPerCon int // 每个连接的理想通道数
}
// mqClient 代表一个到RabbitMQ服务器的客户端实例
type mqClient struct {
addrs []string // AMQP地址列表
ideConnectionCount int // 理想的连接数
ideChannelCountPerCon int // 每个连接的理想通道数
headCon *mqConnection // 链表头部的连接
currentCon *mqConnection // 当前的连接
}
// mqConnection 代表到RabbitMQ服务器的单个连接
type mqConnection struct {
id int // 连接ID
addr string // 连接地址
notifyConnClose chan *amqp.Error // 连接关闭通知
connection *amqp.Connection // AMQP连接
done chan bool // 用于停止监视goroutine的通道
isReady bool // 标记连接是否准备就绪
ideChannelCount int // 理想的通道数
nextCon *mqConnection // 链表中的下一个连接
headChan *mqChannel // 链表头部的通道
currentChan *mqChannel // 当前的通道
}
// mqChannel 代表一个AMQP通道
type mqChannel struct {
id int // 通道ID
channel *amqp.Channel // AMQP通道
notifyChanClose chan *amqp.Error // 通道关闭通知
nextChan *mqChannel // 链表中的下一个通道
isReady bool // 标记通道是否准备就绪
done chan bool // 用于停止监视goroutine的通道
}
// rabbitmqClient 管理所有的mqClient实例
type rabbitmqClient struct {
store map[string]*mqClient // 通过名称索引的mqClient实例存储
}
// MqClienter 接口定义了客户端的操作
type MqClienter interface {
GetClient(clientName string) (*amqp.Connection, error) // 获取指定客户端的连接
GetChannel(clientName string) (*amqp.Channel, error) // 获取指定客户端的通道
Close(clientName string) error // 关闭指定客户端
}
// NewClient 根据提供的配置创建一个新的RabbitMQ客户端管理器
func NewClient(confs map[string]MqConf) MqClienter {
store := &rabbitmqClient{
store: make(map[string]*mqClient, len(confs)), // 初始化存储
}
for clientName, conf := range confs {
// 检查配置的有效性
if len(conf.Hosts) == 0 {
log.Fatalln(fmt.Sprintf("New Rabbitmq client [%s] failed: empty conf.Hosts .", clientName))
}
// 确保连接数和通道数的配置有效,否则使用默认值
if conf.IdeConnectionCount <= 0 {
conf.IdeConnectionCount = defaultIdeConnectionCount
}
if conf.IdeChannelCountPerCon <= 0 {
conf.IdeChannelCountPerCon = defaultIdeChannelCountPerCon
}
// 构建AMQP地址
addrs := make([]string, len(conf.Hosts))
for k, host := range conf.Hosts {
addrs[k] = fmt.Sprintf("amqp://%s:%s@%s/%s", conf.Name, conf.Pass, host, conf.Vhost)
}
// 创建客户端实例
client := &mqClient{
addrs: addrs,
ideConnectionCount: conf.IdeConnectionCount,
ideChannelCountPerCon: conf.IdeChannelCountPerCon,
}
log.Printf("===== Create client [%s] ... =====\n", clientName)
// 加载并初始化客户端
store.loadClient(clientName, client)
}
return store
}
// loadClient 加载并初始化指定名称的客户端实例
func (c *rabbitmqClient) loadClient(clientName string, client *mqClient) {
count := 0
for {
// 为每个地址创建连接直到达到理想的连接数
for _, addr := range client.addrs {
if count >= client.ideConnectionCount {
// 当达到理想的连接数,将客户端保存并退出循环
c.store[clientName] = client
return
}
// 创建并配置连接实例
con := &mqConnection{
id: count,
addr: addr,
notifyConnClose: make(chan *amqp.Error),
ideChannelCount: client.ideChannelCountPerCon,
done: make(chan bool),
}
// 连接到RabbitMQ并加载通道
client.loadConnection(con)
count++
}
}
}
// loadConnection 连接到RabbitMQ服务器,并为每个连接加载通道
func (client *mqClient) loadConnection(con *mqConnection) {
if err := con.connect(); err != nil {
log.Fatalln("Connect failed. " + err.Error())
}
// 启动一个 goroutine 监听连接关闭事件
go con.watch()
// 为该连接加载通道
if err := con.loadChannel(); err != nil {
log.Fatalln("create channel failed. " + err.Error())
}
// 将新连接添加到链表中
if client.headCon == nil {
// 如果是第一个连接,初始化链表
con.nextCon = con
client.headCon = con
client.currentCon = con
} else {
// 否则,添加到链表尾部
con.nextCon = client.headCon.nextCon
client.headCon.nextCon = con
}
}
// connect 建立到RabbitMQ服务器的实际连接
func (con *mqConnection) connect() error {
conn, err := amqp.Dial(con.addr)
if err != nil {
return err
}
// 更新连接状态,并注册连接关闭通知
con.changeConnection(conn)
log.Println(fmt.Sprintf("[%d] Connected to %s success!", con.id, con.addr))
return nil
}
// changeConnection 更新连接实例并准备接收关闭通知
func (con *mqConnection) changeConnection(connection *amqp.Connection) {
con.connection = connection
con.notifyConnClose = make(chan *amqp.Error)
con.connection.NotifyClose(con.notifyConnClose)
con.isReady = true
}
// watch 监听连接的关闭事件,并尝试重连
func (con *mqConnection) watch() {
for {
select {
case <-con.done:
// 如果接收到结束信号,则退出监听
return
case <-con.notifyConnClose:
// 连接关闭,尝试重连
con.handleReconnect()
}
}
}
// handleReconnect 尝试重新连接到RabbitMQ服务器
func (con *mqConnection) handleReconnect() {
for {
con.isReady = false // 设置连接状态为不可用
log.Println(fmt.Sprintf("[%d] Attempting to connect...", con.id))
// 尝试建立连接
if err := con.connect(); err != nil {
// 如果连接失败,记录错误并等待一段时间后重试
log.Println(fmt.Sprintf("[%d] Failed to connect %s. Retrying...", con.id, con.addr))
select {
case <-con.done:
// 如果收到结束信号,则退出重连循环
return
case <-time.After(reconnectInterval):
// 等待一定时间后再次尝试连接
}
continue
}
// 如果连接成功,退出循环
break
}
}
// watch 监听通道关闭通知,如果通道关闭,则尝试重建通道
func (ch *mqChannel) watch(mqcon *mqConnection) {
for {
select {
case <-ch.done:
// 如果收到结束信号,则停止监听并退出
return
case <-ch.notifyChanClose:
// 如果通道关闭,尝试重新创建通道
ch.handleRecreate(mqcon)
}
}
}
// handleRecreate 尝试重新创建通道
func (ch *mqChannel) handleRecreate(mqcon *mqConnection) {
for {
ch.isReady = false // 设置通道状态为不可用
err := ch.recreate(mqcon)
if err != nil {
// 如果创建通道失败,记录错误并等待一段时间后重试
log.Println(fmt.Sprintf("[%d-%d] Failed to create channel. Retrying...", mqcon.id, ch.id))
select {
case <-ch.done:
// 如果收到结束信号,则停止重建并退出
return
case <-time.After(reconnectInterval):
// 等待一定时间后再次尝试创建通道
}
continue
}
// 如果通道创建成功,记录成功信息并退出循环
log.Println(fmt.Sprintf("[%d-%d] Recreate channel success!", mqcon.id, ch.id))
break
}
}
// recreate 重新创建通道
func (ch *mqChannel) recreate(con *mqConnection) error {
if !con.isReady {
return errConNotConnected // 如果连接不可用,返回对应错误
}
if con.connection.IsClosed() {
return errConClosed // 如果连接已关闭,返回对应错误
}
// 尝试创建新的通道
channel, err := con.connection.Channel()
if err != nil {
return err // 如果创建失败,返回错误
}
// 更新通道实例并标记为就绪
ch.changeChannel(channel)
return nil
}
// changeChannel 更新通道实例并重新注册通道关闭通知
func (ch *mqChannel) changeChannel(channel *amqp.Channel) {
ch.channel = channel // 更新通道实例
ch.notifyChanClose = make(chan *amqp.Error) // 重新创建通道关闭通知通道
ch.channel.NotifyClose(ch.notifyChanClose) // 注册通道关闭通知
ch.isReady = true // 标记通道为就绪状态
}
// closeConnections 关闭客户端的所有连接
func (client *mqClient) closeConnections() {
if client.headCon == nil {
return // 如果没有连接,直接返回
}
current := client.headCon
for {
if current.nextCon == nil {
break // 遍历结束
}
current.closeChannels() // 关闭当前连接的所有通道
close(current.done) // 发送结束信号
if err := current.connection.Close(); err != nil {
// 尝试关闭当前连接,如果失败则记录错误信息
log.Println(fmt.Sprintf("[%d] Close connection failed. %s", current.id, err.Error()))
}
current.isReady = false // 标记连接为不可用
next := current.nextCon // 移动到下一个连接
current.nextCon = nil
current = next
}
}
// closeChannels 关闭连接的所有通道
func (con *mqConnection) closeChannels() {
if con.headChan == nil {
return // 如果没有通道,直接返回
}
current := con.headChan
for {
// 遍历连接中的所有通道,并尝试关闭它们
close(current.done) // 发送结束信号给通道的监视goroutine
if err := current.channel.Close(); err != nil {
// 尝试关闭AMQP通道,如果失败则记录错误
log.Println(fmt.Sprintf("[%d-%d] Close channel failed. %s", con.id, current.id, err.Error()))
}
current.isReady = false // 标记通道为不可用
next := current.nextChan // 移动到链表中的下一个通道
if next == current {
break // 如果链表只有一个通道或所有通道已遍历完毕,则退出循环
}
current.nextChan = nil // 清除当前通道的下一个通道引用
current = next // 将下一个通道设置为当前通道,继续循环
}
}