Go
Go连接kafka消费队列
Go连接kafka消费队列
最近接到一个需求,就是需要开发一个脚本去消费kafka队列,然后发送消息给意向客户,综合考虑,最后决定使用Go来实现,最终很快完成脚本开发,特意跟大家分享,首先我们创建一个Go项目目录messagePush,初始化mod,输入以下命令行
go mod init messagePush
创建完之后,我们新建一个配置文件,用于存放配置信息,命名为config.json,配置如下
{ "kafka_connection": "172.31.128.1:9092", "kafka_topic": "AICustIntentionPush" }
然后创建运行文件,命名为main.go,代码如下
package main //消费kafka脚本发送加微请求 //引入扩展 import ( "sync" "encoding/json" "io/ioutil" "fmt" "github.com/Shopify/sarama" "time" "github.com/rifflock/lfshook" "github.com/sirupsen/logrus" "os" "path" "github.com/lestrrat/go-file-rotatelogs" "strconv" ) //进程 var wg sync.WaitGroup //定义配置文件解析后的结构 type Config struct { KafkaConnection string `json:"kafka_connection"` KafkaTopic string `json:"kafka_topic"` KafkaTaskId float64 `json:"kafka_task_id"` KafkaTaskIdB float64 `json:"kafka_task_id_b"` } //空配置文件 var configData Config /** 程序启动自定义 */ func init() { //初始化读取配置文件 jsonByte, err := ioutil.ReadFile("./config.json") if err != nil { fmt.Println("读取json文件失败", err) return } err = json.Unmarshal(jsonByte, &configData) if err != nil { fmt.Println("解析数据失败", err) return } } /** 主程序 */ func main() { //获取配置 kafkaTopic := configData.KafkaTopic //分区 KafkaConnection := configData.KafkaConnection //服务连接地址 //创建消费者 consumer, err := sarama.NewConsumer([]string{KafkaConnection}, nil) if err != nil { fmt.Printf("fail to start consumer, err:%v\n", err) return } //获取主题分区 partitionList, err := consumer.Partitions(kafkaTopic) // 根据topic取到所有的分区 if err != nil { fmt.Printf("fail to get list of partition:err%v\n", err) return } //打印启动 fmt.Println("任务开始启动") //遍历分区 for partition := range partitionList { //针对每个分区创建一个对应的分区消费者 pc, err := consumer.ConsumePartition(kafkaTopic, int32(partition), sarama.OffsetNewest) //判断是否连接成功 if err != nil { fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err) return } //关闭连接 defer pc.AsyncClose() // 异步从每个分区消费信息 wg.Add(1) //+1 go func(sarama.PartitionConsumer) { defer wg.Done() //-1 for msg := range pc.Messages() { //记录接收 LogInfo("Info", "log", "kafka.log", string(msg.Value)) //获取消息信息 var messageInfoList map[string]interface{} errResult := json.Unmarshal(msg.Value, &messageInfoList) if errResult != nil { continue } //获取记录Id dataId, dataIdErr := strconv.Atoi(StrVal(messageInfoList["id"])) if dataIdErr != nil { dataId = 0 } //获取消息结构体 messageDataInfo, messageDataInfoErr := messageInfoList["msg"].(string) if !messageDataInfoErr { continue } //string转msp var messageInfo map[string]interface{} messageInfoErr := json.Unmarshal([]byte(messageDataInfo), &messageInfo) if messageInfoErr != nil { continue } //获取任务Id TaskId, TaskIdErr := messageInfo["TaskId"].(float64) if !TaskIdErr { TaskId = 0 } //获取手机号码 phoneNum, phoneNumErr := messageInfo["phoneNum"].(string) if !phoneNumErr { phoneNum = "" } //获取意向 custIntention, custIntentionErr := messageInfo["custIntention"].(string) if !custIntentionErr { custIntention = "" } //判断当前客户是否具备完整信息 if TaskId == 0 || phoneNum == "" || custIntention == "" || dataId == 0 { continue } //判断是否属于意向客户 if custIntention != "A" && custIntention != "B" { continue } //输出 fmt.Println(phoneNum + ":已接收") } }(pc) } wg.Wait() } /** 记录日志 */ func LogInfo(level, fileLogPath, fileLogName, content string) { //初始化日志配置 var ( logFilePath = fileLogPath //文件存储路径 logFileName = fileLogName //文件名 ) // 日志文件 fileName := path.Join(logFilePath, logFileName) // 写入文件 file, _ := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY, 0666) //关闭连接 defer file.Close() // 实例化 logger := logrus.New() // 日志级别 logger.SetLevel(logrus.DebugLevel) // 设置输出 logger.Out = file // 设置 rotatelogs,实现文件分割 logWriter, _ := rotatelogs.New( // 分割后的文件名称 fileName+".%Y%m%d.log", // 生成软链,指向最新日志文件 rotatelogs.WithLinkName(fileName), // 设置最大保存时间(7天) rotatelogs.WithMaxAge(7*24*time.Hour), //以hour为单位的整数 // 设置日志切割时间间隔(1天) rotatelogs.WithRotationTime(1*time.Hour), ) // hook机制的设置 writerMap := lfshook.WriterMap{ logrus.InfoLevel: logWriter, logrus.FatalLevel: logWriter, logrus.DebugLevel: logWriter, logrus.WarnLevel: logWriter, logrus.ErrorLevel: logWriter, logrus.PanicLevel: logWriter, } //给logrus添加hook logger.AddHook(lfshook.NewHook(writerMap, &logrus.JSONFormatter{ TimestampFormat: "2006-01-02 15:04:05", })) // 打印日志 switch { case level == "Info": //正常打印 logger.WithFields(logrus.Fields{"content": content,}).Info() case level == "Warn": //警告 logger.WithFields(logrus.Fields{"content": content,}).Warn() case level == "Error": //错误 logger.WithFields(logrus.Fields{"content": content,}).Error() case level == "Debug": //调试 logger.WithFields(logrus.Fields{"content": content,}).Debug() default: } } /** 类型转换 */ func StrVal(value interface{}) string { // interface 转 string var key string if value == nil { return key } //类型判断 switch value.(type) { case float64: ft := value.(float64) key = strconv.FormatFloat(ft, 'f', -1, 64) case float32: ft := value.(float32) key = strconv.FormatFloat(float64(ft), 'f', -1, 64) case int: it := value.(int) key = strconv.Itoa(it) case uint: it := value.(uint) key = strconv.Itoa(int(it)) case int8: it := value.(int8) key = strconv.Itoa(int(it)) case uint8: it := value.(uint8) key = strconv.Itoa(int(it)) case int16: it := value.(int16) key = strconv.Itoa(int(it)) case uint16: it := value.(uint16) key = strconv.Itoa(int(it)) case int32: it := value.(int32) key = strconv.Itoa(int(it)) case uint32: it := value.(uint32) key = strconv.Itoa(int(it)) case int64: it := value.(int64) key = strconv.FormatInt(it, 10) case uint64: it := value.(uint64) key = strconv.FormatUint(it, 10) case string: key = value.(string) case []byte: key = string(value.([]byte)) default: newValue, _ := json.Marshal(value) key = string(newValue) } //返回 return key }
然后我们在工作目录下创建一个目录名为log,用于存放日志。
我们直接在项目目录下运行如下命令启动脚本
go run ./
然后我们在kafka生产数据,如下
程序接收到队列数据后,直接展示
这样我们就用Go实现队列消费。
0条评论