订阅发布
概述
微服务是一种事件驱动的体系结构模式,因此 Vine 使用消息代理接口构建异步消息的概念。它可为用户无缝地运行 protobuf 类型,并自动编码和解码消息。
// Broker is an interface used for asynchronous messaging.
type Broker interface {
Init(...Option) error
Options() Options
Address() string
Connect() error
Disconnect() error
Publish(ctx context.Context, topic string, m *Message, opts ...PublishOption) error
Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
String() string
}
默认情况下,vine 实现点对点 http 代理,但可以通过 plugins 替换实现。
发布消息
使用 topic 名称和服务客户端创建一个新的发布者
p := vine.NewEvent("events", service.Client())
发布 proto 消息
p.Publish(context.TODO(), &proto.Event{Name: "event"})
订阅
创建消息处理程序。它的签名应该是 func(context.Context, v interface{}) error
func ProcessEvent(ctx context.Context, event *proto.Event) error {
fmt.Printf("Got event %+v\n", event)
return nil
}
使用 topic 注册消息处理程序
vine.RegisterSubscriber("events", ProcessEvent)
完整实例可以看 example/pubsub
单独使用 Broker
Broker
也可以单独使用:
package main
import (
"fmt"
"time"
"github.com/vine-io/vine/service/broker"
log "github.com/vine-io/vine/service/logger"
)
func main() {
topic := "go.vine.topic.foo"
b := broker.NewBroker()
if err := b.Init(); err != nil {
log.Fatalf("Broker Init error: %v", err)
}
if err := b.Connect(); err != nil {
log.Fatalf("Broker Connect error: %v", err)
}
go func() {
// receive message from broker
b.Subscribe(topic, func(p broker.Event) error {
fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header)
return nil
})
}()
go func() {
<-time.After(time.Second * 1)
// publish message to broker
b.Publish(context.TODO(), topic, &broker.Message{Header: map[string]string{"a": "b"}, Body: []byte("hello world")})
}()
time.Sleep(time.Second * 2)
b.Disconnect()
}
最后修改 February 21, 2023: vine@v1.5 (cd019cf)