Go
Go连接kafka消费队列
Go连接kafka消费队列
最近接到一个需求,就是需要开发一个脚本去消费kafka队列,然后发送消息给意向客户,综合考虑,最后决定使用Go来实现,最终很快完成脚本开发,特意跟大家分享,首先我们创建一个Go项目目录messagePush,初始化mod,输入以下命令行
go mod init messagePush
创建完之后,我们新建一个配置文件,用于存放配置信息,命名为config.json,配置如下
{
"kafka_connection": "172.31.128.1:9092",
"kafka_topic": "AICustIntentionPush"
}然后创建运行文件,命名为main.go,代码如下
package main
//消费kafka脚本发送加微请求
//引入扩展
import (
"sync"
"encoding/json"
"io/ioutil"
"fmt"
"github.com/Shopify/sarama"
"time"
"github.com/rifflock/lfshook"
"github.com/sirupsen/logrus"
"os"
"path"
"github.com/lestrrat/go-file-rotatelogs"
"strconv"
)
//进程
var wg sync.WaitGroup
//定义配置文件解析后的结构
type Config struct {
KafkaConnection string `json:"kafka_connection"`
KafkaTopic string `json:"kafka_topic"`
KafkaTaskId float64 `json:"kafka_task_id"`
KafkaTaskIdB float64 `json:"kafka_task_id_b"`
}
//空配置文件
var configData Config
/**
程序启动自定义
*/
func init() {
//初始化读取配置文件
jsonByte, err := ioutil.ReadFile("./config.json")
if err != nil {
fmt.Println("读取json文件失败", err)
return
}
err = json.Unmarshal(jsonByte, &configData)
if err != nil {
fmt.Println("解析数据失败", err)
return
}
}
/**
主程序
*/
func main() {
//获取配置
kafkaTopic := configData.KafkaTopic //分区
KafkaConnection := configData.KafkaConnection //服务连接地址
//创建消费者
consumer, err := sarama.NewConsumer([]string{KafkaConnection}, nil)
if err != nil {
fmt.Printf("fail to start consumer, err:%v\n", err)
return
}
//获取主题分区
partitionList, err := consumer.Partitions(kafkaTopic) // 根据topic取到所有的分区
if err != nil {
fmt.Printf("fail to get list of partition:err%v\n", err)
return
}
//打印启动
fmt.Println("任务开始启动")
//遍历分区
for partition := range partitionList {
//针对每个分区创建一个对应的分区消费者
pc, err := consumer.ConsumePartition(kafkaTopic, int32(partition), sarama.OffsetNewest)
//判断是否连接成功
if err != nil {
fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
return
}
//关闭连接
defer pc.AsyncClose()
// 异步从每个分区消费信息
wg.Add(1) //+1
go func(sarama.PartitionConsumer) {
defer wg.Done() //-1
for msg := range pc.Messages() {
//记录接收
LogInfo("Info", "log", "kafka.log", string(msg.Value))
//获取消息信息
var messageInfoList map[string]interface{}
errResult := json.Unmarshal(msg.Value, &messageInfoList)
if errResult != nil {
continue
}
//获取记录Id
dataId, dataIdErr := strconv.Atoi(StrVal(messageInfoList["id"]))
if dataIdErr != nil {
dataId = 0
}
//获取消息结构体
messageDataInfo, messageDataInfoErr := messageInfoList["msg"].(string)
if !messageDataInfoErr {
continue
}
//string转msp
var messageInfo map[string]interface{}
messageInfoErr := json.Unmarshal([]byte(messageDataInfo), &messageInfo)
if messageInfoErr != nil {
continue
}
//获取任务Id
TaskId, TaskIdErr := messageInfo["TaskId"].(float64)
if !TaskIdErr {
TaskId = 0
}
//获取手机号码
phoneNum, phoneNumErr := messageInfo["phoneNum"].(string)
if !phoneNumErr {
phoneNum = ""
}
//获取意向
custIntention, custIntentionErr := messageInfo["custIntention"].(string)
if !custIntentionErr {
custIntention = ""
}
//判断当前客户是否具备完整信息
if TaskId == 0 || phoneNum == "" || custIntention == "" || dataId == 0 {
continue
}
//判断是否属于意向客户
if custIntention != "A" && custIntention != "B" {
continue
}
//输出
fmt.Println(phoneNum + ":已接收")
}
}(pc)
}
wg.Wait()
}
/**
记录日志
*/
func LogInfo(level, fileLogPath, fileLogName, content string) {
//初始化日志配置
var (
logFilePath = fileLogPath //文件存储路径
logFileName = fileLogName //文件名
)
// 日志文件
fileName := path.Join(logFilePath, logFileName)
// 写入文件
file, _ := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY, 0666)
//关闭连接
defer file.Close()
// 实例化
logger := logrus.New()
// 日志级别
logger.SetLevel(logrus.DebugLevel)
// 设置输出
logger.Out = file
// 设置 rotatelogs,实现文件分割
logWriter, _ := rotatelogs.New(
// 分割后的文件名称
fileName+".%Y%m%d.log",
// 生成软链,指向最新日志文件
rotatelogs.WithLinkName(fileName),
// 设置最大保存时间(7天)
rotatelogs.WithMaxAge(7*24*time.Hour), //以hour为单位的整数
// 设置日志切割时间间隔(1天)
rotatelogs.WithRotationTime(1*time.Hour),
)
// hook机制的设置
writerMap := lfshook.WriterMap{
logrus.InfoLevel: logWriter,
logrus.FatalLevel: logWriter,
logrus.DebugLevel: logWriter,
logrus.WarnLevel: logWriter,
logrus.ErrorLevel: logWriter,
logrus.PanicLevel: logWriter,
}
//给logrus添加hook
logger.AddHook(lfshook.NewHook(writerMap, &logrus.JSONFormatter{
TimestampFormat: "2006-01-02 15:04:05",
}))
// 打印日志
switch {
case level == "Info": //正常打印
logger.WithFields(logrus.Fields{"content": content,}).Info()
case level == "Warn": //警告
logger.WithFields(logrus.Fields{"content": content,}).Warn()
case level == "Error": //错误
logger.WithFields(logrus.Fields{"content": content,}).Error()
case level == "Debug": //调试
logger.WithFields(logrus.Fields{"content": content,}).Debug()
default:
}
}
/**
类型转换
*/
func StrVal(value interface{}) string {
// interface 转 string
var key string
if value == nil {
return key
}
//类型判断
switch value.(type) {
case float64:
ft := value.(float64)
key = strconv.FormatFloat(ft, 'f', -1, 64)
case float32:
ft := value.(float32)
key = strconv.FormatFloat(float64(ft), 'f', -1, 64)
case int:
it := value.(int)
key = strconv.Itoa(it)
case uint:
it := value.(uint)
key = strconv.Itoa(int(it))
case int8:
it := value.(int8)
key = strconv.Itoa(int(it))
case uint8:
it := value.(uint8)
key = strconv.Itoa(int(it))
case int16:
it := value.(int16)
key = strconv.Itoa(int(it))
case uint16:
it := value.(uint16)
key = strconv.Itoa(int(it))
case int32:
it := value.(int32)
key = strconv.Itoa(int(it))
case uint32:
it := value.(uint32)
key = strconv.Itoa(int(it))
case int64:
it := value.(int64)
key = strconv.FormatInt(it, 10)
case uint64:
it := value.(uint64)
key = strconv.FormatUint(it, 10)
case string:
key = value.(string)
case []byte:
key = string(value.([]byte))
default:
newValue, _ := json.Marshal(value)
key = string(newValue)
}
//返回
return key
}然后我们在工作目录下创建一个目录名为log,用于存放日志。
我们直接在项目目录下运行如下命令启动脚本
go run ./
然后我们在kafka生产数据,如下

程序接收到队列数据后,直接展示

这样我们就用Go实现队列消费。
0条评论