Skip to content

带有自动重连机制的 Go RabbitMQ Publisher

Published: at 06:35 PM

前段时间业务中出现了一个问题,最后发现是 RabbitMQ 的连接断开导致的,所以我对 RabbitMQ Publisher 进行了改造,增加了自动重连机制。

我们原来在业务中使用 RabbitMQ 时,是以类似demo的最简单的方式来使用的(-_-),类似这样:

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
ch, err := conn.Channel()
err = ch.Publish(...)

结果某天发现 RabbitMQ 连接断开了,导致业务中的消息发送失败,所以简单写了一个包。

功能:

以下是代码:

import (
	"errors"
	"fmt"
	amqp "github.com/rabbitmq/amqp091-go"
	"log"
	"time"
)

const (
	reconnectInterval            = 5 * time.Second // 重连间隔时间
	defaultIdeConnectionCount    = 5               // 默认的连接数
	defaultIdeChannelCountPerCon = 5               // 每个连接的默认通道数
)

// 定义可能遇到的错误
var (
	errConClosed        = errors.New("Connection is closed.")        // 连接已关闭错误
	errConNotConnected  = errors.New("Connection not connected to a server.") // 连接未连接到服务器错误
	errChanNotConnected = errors.New("Channel not connected to a server.")    // 通道未连接到服务器错误
)

// MqConf 定义RabbitMQ的配置
type MqConf struct {
	Hosts                 []string // 主机地址列表
	Name                  string   // 用户名
	Pass                  string   // 密码
	Vhost                 string   // 虚拟主机
	IdeConnectionCount    int      // 理想的连接数
	IdeChannelCountPerCon int      // 每个连接的理想通道数
}

// mqClient 代表一个到RabbitMQ服务器的客户端实例
type mqClient struct {
	addrs                 []string     // AMQP地址列表
	ideConnectionCount    int          // 理想的连接数
	ideChannelCountPerCon int          // 每个连接的理想通道数
	headCon               *mqConnection // 链表头部的连接
	currentCon            *mqConnection // 当前的连接
}

// mqConnection 代表到RabbitMQ服务器的单个连接
type mqConnection struct {
	id              int           // 连接ID
	addr            string        // 连接地址
	notifyConnClose chan *amqp.Error // 连接关闭通知
	connection      *amqp.Connection // AMQP连接
	done            chan bool    // 用于停止监视goroutine的通道
	isReady         bool         // 标记连接是否准备就绪
	ideChannelCount int          // 理想的通道数
	nextCon         *mqConnection // 链表中的下一个连接
	headChan        *mqChannel   // 链表头部的通道
	currentChan     *mqChannel   // 当前的通道
}

// mqChannel 代表一个AMQP通道
type mqChannel struct {
	id              int           // 通道ID
	channel         *amqp.Channel // AMQP通道
	notifyChanClose chan *amqp.Error // 通道关闭通知
	nextChan        *mqChannel   // 链表中的下一个通道
	isReady         bool         // 标记通道是否准备就绪
	done            chan bool    // 用于停止监视goroutine的通道
}

// rabbitmqClient 管理所有的mqClient实例
type rabbitmqClient struct {
	store map[string]*mqClient // 通过名称索引的mqClient实例存储
}

// MqClienter 接口定义了客户端的操作
type MqClienter interface {
	GetClient(clientName string) (*amqp.Connection, error) // 获取指定客户端的连接
	GetChannel(clientName string) (*amqp.Channel, error)   // 获取指定客户端的通道
	Close(clientName string) error                         // 关闭指定客户端
}

// NewClient 根据提供的配置创建一个新的RabbitMQ客户端管理器
func NewClient(confs map[string]MqConf) MqClienter {
	store := &rabbitmqClient{
		store: make(map[string]*mqClient, len(confs)), // 初始化存储
	}

	for clientName, conf := range confs {
		// 检查配置的有效性
		if len(conf.Hosts) == 0 {
			log.Fatalln(fmt.Sprintf("New Rabbitmq client [%s] failed: empty conf.Hosts .", clientName))
				}

		// 确保连接数和通道数的配置有效,否则使用默认值
		if conf.IdeConnectionCount <= 0 {
			conf.IdeConnectionCount = defaultIdeConnectionCount
		}
		if conf.IdeChannelCountPerCon <= 0 {
			conf.IdeChannelCountPerCon = defaultIdeChannelCountPerCon
		}

		// 构建AMQP地址
		addrs := make([]string, len(conf.Hosts))
		for k, host := range conf.Hosts {
			addrs[k] = fmt.Sprintf("amqp://%s:%s@%s/%s", conf.Name, conf.Pass, host, conf.Vhost)
		}

		// 创建客户端实例
		client := &mqClient{
			addrs:                 addrs,
			ideConnectionCount:    conf.IdeConnectionCount,
			ideChannelCountPerCon: conf.IdeChannelCountPerCon,
		}
		log.Printf("===== Create client [%s] ... =====\n", clientName)
		// 加载并初始化客户端
		store.loadClient(clientName, client)
	}

	return store
}

// loadClient 加载并初始化指定名称的客户端实例
func (c *rabbitmqClient) loadClient(clientName string, client *mqClient) {
	count := 0

	for {
		// 为每个地址创建连接直到达到理想的连接数
		for _, addr := range client.addrs {
			if count >= client.ideConnectionCount {
				// 当达到理想的连接数,将客户端保存并退出循环
				c.store[clientName] = client
				return
			}

			// 创建并配置连接实例
			con := &mqConnection{
				id:              count,
				addr:            addr,
				notifyConnClose: make(chan *amqp.Error),
				ideChannelCount: client.ideChannelCountPerCon,
				done:            make(chan bool),
			}

			// 连接到RabbitMQ并加载通道
			client.loadConnection(con)
			count++
		}
	}
}

// loadConnection 连接到RabbitMQ服务器,并为每个连接加载通道
func (client *mqClient) loadConnection(con *mqConnection) {
	if err := con.connect(); err != nil {
		log.Fatalln("Connect failed. " + err.Error())
	}

	// 启动一个 goroutine 监听连接关闭事件
	go con.watch()

	// 为该连接加载通道
	if err := con.loadChannel(); err != nil {
		log.Fatalln("create channel failed. " + err.Error())
	}

	// 将新连接添加到链表中
	if client.headCon == nil {
		// 如果是第一个连接,初始化链表
		con.nextCon = con
		client.headCon = con
		client.currentCon = con
	} else {
		// 否则,添加到链表尾部
		con.nextCon = client.headCon.nextCon
		client.headCon.nextCon = con
	}
}

// connect 建立到RabbitMQ服务器的实际连接
func (con *mqConnection) connect() error {
	conn, err := amqp.Dial(con.addr)
	if err != nil {
		return err
	}

	// 更新连接状态,并注册连接关闭通知
	con.changeConnection(conn)
	log.Println(fmt.Sprintf("[%d] Connected to %s success!", con.id, con.addr))

	return nil
}

// changeConnection 更新连接实例并准备接收关闭通知
func (con *mqConnection) changeConnection(connection *amqp.Connection) {
	con.connection = connection
	con.notifyConnClose = make(chan *amqp.Error)
	con.connection.NotifyClose(con.notifyConnClose)
	con.isReady = true
}

// watch 监听连接的关闭事件,并尝试重连
func (con *mqConnection) watch() {
	for {
		select {
		case <-con.done:
			// 如果接收到结束信号,则退出监听
			return
		case <-con.notifyConnClose:
			// 连接关闭,尝试重连
			con.handleReconnect()
		}
	}
}

// handleReconnect 尝试重新连接到RabbitMQ服务器
func (con *mqConnection) handleReconnect() {
	for {
		con.isReady = false // 设置连接状态为不可用
		log.Println(fmt.Sprintf("[%d] Attempting to connect...", con.id))

		// 尝试建立连接
		if err := con.connect(); err != nil {
			// 如果连接失败,记录错误并等待一段时间后重试
			log.Println(fmt.Sprintf("[%d] Failed to connect %s. Retrying...", con.id, con.addr))

			select {
			case <-con.done:
				// 如果收到结束信号,则退出重连循环
				return
			case <-time.After(reconnectInterval):
				// 等待一定时间后再次尝试连接
			}
			continue
		}

		// 如果连接成功,退出循环
		break
	}
}

// watch 监听通道关闭通知,如果通道关闭,则尝试重建通道
func (ch *mqChannel) watch(mqcon *mqConnection) {
	for {
		select {
		case <-ch.done:
			// 如果收到结束信号,则停止监听并退出
			return
		case <-ch.notifyChanClose:
			// 如果通道关闭,尝试重新创建通道
			ch.handleRecreate(mqcon)
		}
	}
}

// handleRecreate 尝试重新创建通道
func (ch *mqChannel) handleRecreate(mqcon *mqConnection) {
	for {
		ch.isReady = false // 设置通道状态为不可用
		err := ch.recreate(mqcon)
		if err != nil {
			// 如果创建通道失败,记录错误并等待一段时间后重试
			log.Println(fmt.Sprintf("[%d-%d] Failed to create channel. Retrying...", mqcon.id, ch.id))

			select {
			case <-ch.done:
				// 如果收到结束信号,则停止重建并退出
				return
			case <-time.After(reconnectInterval):
				// 等待一定时间后再次尝试创建通道
			}
			continue
		}

		// 如果通道创建成功,记录成功信息并退出循环
		log.Println(fmt.Sprintf("[%d-%d] Recreate channel success!", mqcon.id, ch.id))
		break
	}
}

// recreate 重新创建通道
func (ch *mqChannel) recreate(con *mqConnection) error {
	if !con.isReady {
		return errConNotConnected // 如果连接不可用,返回对应错误
	}

	if con.connection.IsClosed() {
		return errConClosed // 如果连接已关闭,返回对应错误
	}

	// 尝试创建新的通道
	channel, err := con.connection.Channel()
	if err != nil {
		return err // 如果创建失败,返回错误
	}

	// 更新通道实例并标记为就绪
	ch.changeChannel(channel)
	return nil
}

// changeChannel 更新通道实例并重新注册通道关闭通知
func (ch *mqChannel) changeChannel(channel *amqp.Channel) {
	ch.channel = channel // 更新通道实例
	ch.notifyChanClose = make(chan *amqp.Error) // 重新创建通道关闭通知通道
	ch.channel.NotifyClose(ch.notifyChanClose) // 注册通道关闭通知
	ch.isReady = true // 标记通道为就绪状态
}

// closeConnections 关闭客户端的所有连接
func (client *mqClient) closeConnections() {
	if client.headCon == nil {
		return // 如果没有连接,直接返回
	}

	current := client.headCon

	for {
		if current.nextCon == nil {
			break // 遍历结束
		}
		current.closeChannels() // 关闭当前连接的所有通道
		close(current.done) // 发送结束信号
		if err := current.connection.Close(); err != nil {
			// 尝试关闭当前连接,如果失败则记录错误信息
			log.Println(fmt.Sprintf("[%d] Close connection failed. %s", current.id, err.Error()))
		}
		current.isReady = false // 标记连接为不可用

		next := current.nextCon // 移动到下一个连接
		current.nextCon = nil
		current = next
	}
}

// closeChannels 关闭连接的所有通道
func (con *mqConnection) closeChannels() {
    if con.headChan == nil {
        return // 如果没有通道,直接返回
    }

    current := con.headChan

    for {
        // 遍历连接中的所有通道,并尝试关闭它们
        close(current.done) // 发送结束信号给通道的监视goroutine
        if err := current.channel.Close(); err != nil {
            // 尝试关闭AMQP通道,如果失败则记录错误
            log.Println(fmt.Sprintf("[%d-%d] Close channel failed. %s", con.id, current.id, err.Error()))
        }
        current.isReady = false // 标记通道为不可用

        next := current.nextChan // 移动到链表中的下一个通道
        if next == current {
            break // 如果链表只有一个通道或所有通道已遍历完毕,则退出循环
        }
        current.nextChan = nil // 清除当前通道的下一个通道引用
        current = next // 将下一个通道设置为当前通道,继续循环
    }
}