-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy pathpublish.go
115 lines (95 loc) · 2.6 KB
/
publish.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package pubsub
import (
"context"
"encoding/json"
"sync"
"github.com/golang/protobuf/proto"
"golang.org/x/sync/errgroup"
)
// Publish published on the client
func (c *Client) Publish(ctx context.Context, topic string, msg interface{}, isJSON bool) error {
var b []byte
var err error
if isJSON {
b, err = json.Marshal(msg)
} else {
b, err = proto.Marshal(msg.(proto.Message))
}
if err != nil {
return err
}
m := &Msg{Data: b}
mw := chainPublisherMiddleware(c.Middleware...)
return mw(c.ServiceName, func(ctx context.Context, topic string, m *Msg) error {
return c.Provider.Publish(ctx, topic, m)
})(ctx, topic, m)
}
// A PublishResult holds the result from a call to Publish.
type PublishResult struct {
Ready chan struct{}
// Err could be nil if not waiting for `Ready` chan to close. Instead use `.Error()` method.
Err error
}
// Error waits for publish results to come back and return the error if any.
func (p *PublishResult) Error() error {
<-p.Ready
return p.Err
}
// Publish is a convenience message which publishes to the
// current (global) publisher as protobuf
func Publish(ctx context.Context, topic string, msg proto.Message) *PublishResult {
pr := &PublishResult{Ready: make(chan struct{})}
go func() {
var eg errgroup.Group
for _, c := range clients {
cl := c
publishWaitGroup.Add(1)
eg.Go(func() error {
defer publishWaitGroup.Done()
return cl.Publish(ctx, topic, msg, false)
})
}
err := eg.Wait()
if err != nil {
pr.Err = err
}
close(pr.Ready)
}()
return pr
}
// PublishJSON is a convenience message which publishes to the
// current (global) publisher as JSON
func PublishJSON(ctx context.Context, topic string, obj interface{}) *PublishResult {
pr := &PublishResult{Ready: make(chan struct{})}
var wg sync.WaitGroup
for _, c := range clients {
publishWaitGroup.Add(1)
wg.Add(1)
go func(c *Client) {
defer publishWaitGroup.Done()
defer wg.Done()
err := c.Publish(ctx, topic, obj, true)
if err != nil {
pr.Err = err
}
}(c)
}
wg.Wait()
close(pr.Ready)
return pr
}
// WaitForAllPublishing waits for all in flight publisher messages to go, before returning
func WaitForAllPublishing() {
publishWaitGroup.Wait()
}
func chainPublisherMiddleware(mw ...Middleware) func(serviceName string, next PublishHandler) PublishHandler {
return func(serviceName string, final PublishHandler) PublishHandler {
return func(ctx context.Context, topic string, m *Msg) error {
last := final
for i := len(mw) - 1; i >= 0; i-- {
last = mw[i].PublisherMsgInterceptor(serviceName, last)
}
return last(ctx, topic, m)
}
}
}