如果canal投递到kafka未开启json格式,那么投递到kafka中的数据是原始byte数据流 官方文档太差,最后从canal-go项目中找到解析代码
import (
pb "github.com/CanalClient/canal-go/protocol"
"github.com/Shopify/sarama"
"github.com/gogo/protobuf/proto"
)
func Processor(msg *sarama.ConsumerMessage) (err error) {
if msg.Topic != c.GetTopicName() {
return &mq.CustomerError{"topic名称不匹配"}
}
packet := new(pb.Packet)
messages := new(pb.Messages)
message := new(pb.Message)
err = proto.Unmarshal(msg.Value, packet)
if err != nil {
return err
}
var items []pb.Entry
var entry pb.Entry
switch packet.Type {
case pb.PacketType_MESSAGES:
err := proto.Unmarshal(packet.Body, messages)
if err != nil {
return err
}
for _, value := range messages.Messages {
err := proto.Unmarshal(value, &entry)
if err != nil {
return err
}
items = append(items, entry)
}
message.Entries = items
message.Id = messages.GetBatchId()
}
printEntry(message.Entries)
return
}
func printEntry(entrys []pb.Entry) {
for _, entry := range entrys {
if entry.GetEntryType() == pb.EntryType_TRANSACTIONBEGIN || entry.GetEntryType() == pb.EntryType_TRANSACTIONEND {
continue
}
rowChange := new(pb.RowChange)
err := proto.Unmarshal(entry.GetStoreValue(), rowChange)
checkError(err)
if rowChange != nil {
eventType := rowChange.GetEventType()
header := entry.GetHeader()
fmt.Println(fmt.Sprintf("================> binlog[%s : %d],name[%s,%s], eventType: %s", header.GetLogfileName(), header.GetLogfileOffset(), header.GetSchemaName(), header.GetTableName(), header.GetEventType()))
for _, rowData := range rowChange.GetRowDatas() {
if eventType == pb.EventType_DELETE {
printColumn(rowData.GetBeforeColumns())
} else if eventType == pb.EventType_INSERT {
printColumn(rowData.GetAfterColumns())
} else {
fmt.Println("-------> before")
printColumn(rowData.GetBeforeColumns())
fmt.Println("-------> after")
printColumn(rowData.GetAfterColumns())
}
}
}
}
}
func printColumn(columns []*pb.Column) {
for _, col := range columns {
fmt.Println(fmt.Sprintf("%s : %s update= %t", col.GetName(), col.GetValue(), col.GetUpdated()))
}
}
func checkError(err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1)
}
}