如果canal投递到kafka未开启json格式,那么投递到kafka中的数据是原始byte数据流 官方文档太差,最后从canal-go项目中找到解析代码

import (
  pb "github.com/CanalClient/canal-go/protocol"
	"github.com/Shopify/sarama"
	"github.com/gogo/protobuf/proto"
	)

func Processor(msg *sarama.ConsumerMessage) (err error) {
	if msg.Topic != c.GetTopicName() {
		return &mq.CustomerError{"topic名称不匹配"}
	}
	packet := new(pb.Packet)
	messages := new(pb.Messages)
	message := new(pb.Message)

	err = proto.Unmarshal(msg.Value, packet)
	if err != nil {
		return err
	}

	var items []pb.Entry
	var entry pb.Entry
	switch packet.Type {
	case pb.PacketType_MESSAGES:
		err := proto.Unmarshal(packet.Body, messages)
		if err != nil {
			return err
		}

		for _, value := range messages.Messages {
			err := proto.Unmarshal(value, &entry)
			if err != nil {
				return err
			}
			items = append(items, entry)
		}

		message.Entries = items
		message.Id = messages.GetBatchId()

	}

	printEntry(message.Entries)
	return
}

func printEntry(entrys []pb.Entry) {

	for _, entry := range entrys {
		if entry.GetEntryType() == pb.EntryType_TRANSACTIONBEGIN || entry.GetEntryType() == pb.EntryType_TRANSACTIONEND {
			continue
		}
		rowChange := new(pb.RowChange)

		err := proto.Unmarshal(entry.GetStoreValue(), rowChange)
		checkError(err)
		if rowChange != nil {
			eventType := rowChange.GetEventType()
			header := entry.GetHeader()
			fmt.Println(fmt.Sprintf("================> binlog[%s : %d],name[%s,%s], eventType: %s", header.GetLogfileName(), header.GetLogfileOffset(), header.GetSchemaName(), header.GetTableName(), header.GetEventType()))

			for _, rowData := range rowChange.GetRowDatas() {
				if eventType == pb.EventType_DELETE {
					printColumn(rowData.GetBeforeColumns())
				} else if eventType == pb.EventType_INSERT {
					printColumn(rowData.GetAfterColumns())
				} else {
					fmt.Println("-------> before")
					printColumn(rowData.GetBeforeColumns())
					fmt.Println("-------> after")
					printColumn(rowData.GetAfterColumns())
				}
			}
		}
	}
}
func printColumn(columns []*pb.Column) {
	for _, col := range columns {
		fmt.Println(fmt.Sprintf("%s : %s  update= %t", col.GetName(), col.GetValue(), col.GetUpdated()))
	}
}

func checkError(err error) {
	if err != nil {
		fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
		os.Exit(1)
	}
}