Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.

Commit 35dd31f

Browse files
committed
reflect recent server-side pubsub changes, e.g. flushing
1 parent 1ce3c0c commit 35dd31f

File tree

3 files changed

+26
-205
lines changed

3 files changed

+26
-205
lines changed

pubsub.go

Lines changed: 12 additions & 178 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ package shell
33
import (
44
"encoding/base64"
55
"encoding/json"
6-
// "sync"
76
)
87

8+
// floodsub uses base64 encoding for just about everything.
9+
// To Decode the base64 while also decoding the JSON the type b64String was added.
910
type b64String string
1011

12+
// UnmarshalJSON implements the json.Unmarshaler interface.
1113
func (bs *b64String) UnmarshalJSON(in []byte) error {
1214
var b64 string
1315

@@ -32,29 +34,36 @@ func (bs *b64String) Marshal() (string, error) {
3234

3335
///
3436

37+
// PubSubRecord is a record received via PubSub.
3538
type PubSubRecord struct {
3639
From string `json:"from"`
3740
Data b64String `json:"data"`
3841
SeqNo b64String `json:"seqno"`
3942
TopicIDs []string `json:"topicIDs"`
4043
}
4144

45+
// DataString returns the string representation of the data field.
4246
func (r PubSubRecord) DataString() string {
4347
return string(r.Data)
4448
}
4549

4650
///
4751

52+
// PubSubSubscription allow you to receive pubsub records that where published on the network.
4853
type PubSubSubscription struct {
4954
resp *Response
5055
}
5156

5257
func newPubSubSubscription(resp *Response) *PubSubSubscription {
53-
return &PubSubSubscription{
58+
sub := &PubSubSubscription{
5459
resp: resp,
5560
}
61+
62+
sub.Next() // skip empty element used for flushing
63+
return sub
5664
}
5765

66+
// Next waits for the next record and returns that.
5867
func (s *PubSubSubscription) Next() (*PubSubRecord, error) {
5968
if s.resp.Error != nil {
6069
return nil, s.resp.Error
@@ -68,186 +77,11 @@ func (s *PubSubSubscription) Next() (*PubSubRecord, error) {
6877
return r, err
6978
}
7079

80+
// Cancel cancels the given subscription.
7181
func (s *PubSubSubscription) Cancel() error {
7282
if s.resp.Output == nil {
7383
return nil
7484
}
7585

7686
return s.resp.Output.Close()
7787
}
78-
79-
///
80-
81-
/*
82-
type subscriptionHandler struct {
83-
topic string
84-
resp *Response
85-
86-
readers map[chan *PubSubRecord]struct{}
87-
88-
stop chan struct{}
89-
add, drop chan chan *PubSubRecord
90-
91-
failReason error
92-
}
93-
94-
func newPubSubSubscriptionHandler(topic string, resp *Response) *subscriptionHandler {
95-
sh := &subscriptionHandler{
96-
// the topic that is being handled
97-
topic: topic,
98-
// stop shuts down the subscription handler.
99-
stop: make(chan struct{}),
100-
// readers is the set of listeners
101-
readers: make(map[chan *PubSubRecord]struct{}),
102-
//add is the channel in which you add more listeners
103-
add: make(chan chan *PubSubRecord),
104-
//drop is the channel to which you send channels
105-
drop: make(chan chan *PubSubRecord),
106-
resp: resp,
107-
}
108-
109-
go sh.work()
110-
111-
return sh
112-
}
113-
114-
func (sh *subscriptionHandler) work() {
115-
readOne := func(ch chan *PubSubRecord, errCh chan error) {
116-
d := json.NewDecoder(sh.resp.Output)
117-
if sh.resp.Error != nil {
118-
errCh <- sh.resp.Error
119-
return
120-
}
121-
122-
r := PubSubRecord{}
123-
err := d.Decode(&r)
124-
if err != nil {
125-
errCh <- err
126-
return
127-
}
128-
129-
ch <- &r
130-
}
131-
132-
ch := make(chan *PubSubRecord)
133-
errCh := make(chan error)
134-
135-
go readOne(ch, errCh)
136-
137-
L:
138-
for {
139-
select {
140-
// remove a rdCh from pool
141-
case ch := <-sh.drop:
142-
delete(sh.readers, ch)
143-
144-
if len(sh.readers) == 0 {
145-
break L
146-
}
147-
148-
// add a rdCh to pool
149-
case ch := <-sh.add:
150-
sh.readers[ch] = struct{}{}
151-
152-
case r := <-ch:
153-
for rdCh := range sh.readers {
154-
rdCh <- r
155-
}
156-
157-
go readOne(ch, errCh)
158-
159-
case err := <-errCh:
160-
sh.failReason = err
161-
break L
162-
163-
case <-sh.stop:
164-
break L
165-
}
166-
}
167-
168-
for rdCh := range sh.readers {
169-
delete(sh.readers, rdCh)
170-
close(rdCh)
171-
}
172-
173-
//sh.resp.Output.Close()
174-
sh = nil
175-
}
176-
177-
func (sh *subscriptionHandler) Stop() {
178-
sh.stop <- struct{}{}
179-
}
180-
181-
func (sh *subscriptionHandler) Sub() *PubSubSubscription {
182-
ch := make(chan *PubSubRecord)
183-
184-
sh.add <- ch
185-
186-
return newPubSubSubscription(sh.topic, ch)
187-
}
188-
189-
func (sh *subscriptionHandler) Drop(s *PubSubSubscription) {
190-
sh.drop <- s.ch
191-
}
192-
193-
func (sh *subscriptionHandler) Error() error {
194-
return sh.failReason
195-
}
196-
197-
///
198-
199-
type subscriptionManager struct {
200-
sync.Mutex
201-
202-
s *Shell
203-
subs map[string]*subscriptionHandler
204-
}
205-
206-
func newPubSubSubscriptionManager(s *Shell) *subscriptionManager {
207-
return &subscriptionManager{
208-
s: s,
209-
subs: make(map[string]*subscriptionHandler),
210-
}
211-
}
212-
213-
func (sm *subscriptionManager) Sub(topic string) (*PubSubSubscription, error) {
214-
// lock
215-
sm.Lock()
216-
defer sm.Unlock()
217-
218-
// check if already subscribed
219-
sh := sm.subs[topic]
220-
if sh == nil { // if not, do so!
221-
// connect
222-
req := sm.s.newRequest("pubsub/sub", topic)
223-
resp, err := req.Send(sm.s.httpcli)
224-
if err != nil {
225-
return nil, err
226-
}
227-
228-
// pass connection to handler and add handler to manager
229-
sh = newPubSubSubscriptionHandler(topic, resp)
230-
sm.subs[topic] = sh
231-
}
232-
233-
// success
234-
return sh.Sub(), nil
235-
}
236-
237-
func (sm *subscriptionManager) Drop(s *PubSubSubscription) {
238-
sm.Lock()
239-
defer sm.Unlock()
240-
241-
sh := sm.subs[s.topic]
242-
if sh != nil {
243-
sh.Drop(s)
244-
}
245-
}
246-
247-
func (sm *subscriptionManager) dropHandler(sh *subscriptionHandler) {
248-
sm.Lock()
249-
defer sm.Unlock()
250-
251-
delete(sm.subs, sh.topic)
252-
}
253-
*/

shell.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,7 @@ func NewShell(url string) *Shell {
3131
},
3232
}
3333

34-
s := NewShellWithClient(url, c)
35-
//s.sm = newPubSubSubscriptionManager(s)
36-
37-
return s
34+
return NewShellWithClient(url, c)
3835
}
3936

4037
func NewShellWithClient(url string, c *gohttp.Client) *Shell {

shell_test.go

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -125,50 +125,40 @@ func TestPubSub(t *testing.T) {
125125
var (
126126
topic = "test"
127127

128-
sub *Subscription
129-
err error
130-
wait1 <-chan time.Time
131-
wait = make(chan struct{})
128+
sub *PubSubSubscription
129+
err error
132130
)
133131

134-
go func() {
135-
wait1 = time.After(20 * time.Millisecond) // workaround for go-ipfs#3304
136-
wait <- struct{}{}
137-
t.Log("subscribing...")
138-
sub, err = s.PubSubSubscribe(topic)
139-
is.Nil(err)
140-
is.NotNil(sub)
141-
t.Log("sub: done")
142-
143-
wait <- struct{}{}
144-
}()
132+
t.Log("subscribing...")
133+
sub, err = s.PubSubSubscribe(topic)
134+
is.Nil(err)
135+
is.NotNil(sub)
136+
t.Log("sub: done")
145137

146-
<-wait
147-
<-wait1
138+
time.Sleep(10 * time.Millisecond)
148139

149140
t.Log("publishing...")
150141
is.Nil(s.PubSubPublish(topic, "Hello World!"))
151142
t.Log("pub: done")
152143

153-
<-wait
154-
155144
t.Log("next()...")
156145
r, err := sub.Next()
157146
t.Log("next: done. ")
158147

159148
is.NotNil(r)
160149
is.Nil(err)
161-
is.Equal(r.Data, "Hello World!")
150+
is.Equal(r.DataString(), "Hello World!")
162151

163152
sub2, err := s.PubSubSubscribe(topic)
164153
is.Nil(err)
165154
is.NotNil(sub2)
166155

167156
is.Nil(s.PubSubPublish(topic, "Hallo Welt!"))
168157

169-
r, err = sub2.Next() // duplicate subscription error
170-
is.NotNil(err)
171-
is.Nil(r)
158+
r, err = sub2.Next()
159+
is.Nil(err)
160+
is.NotNil(r)
161+
is.Equal(r.Data, "Hallo Welt!")
172162

173163
r, err = sub.Next()
174164
is.NotNil(r)

0 commit comments

Comments
 (0)