Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.

Commit 5a7ec20

Browse files
committed
pubsub: remove subscription management; naive implementation
1 parent 7beab58 commit 5a7ec20

File tree

3 files changed

+55
-21
lines changed

3 files changed

+55
-21
lines changed

pubsub.go

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package shell
33
import (
44
"encoding/base64"
55
"encoding/json"
6-
"sync"
6+
// "sync"
77
)
88

99
type b64String string
@@ -42,27 +42,35 @@ type PubSubRecord struct {
4242
///
4343

4444
type Subscription struct {
45-
topic string
46-
ch chan *PubSubRecord
45+
resp *Response
4746
}
4847

49-
func newSubscription(topic string, ch chan *PubSubRecord) *Subscription {
48+
func newSubscription(resp *Response) *Subscription {
5049
return &Subscription{
51-
topic: topic,
52-
ch: ch,
50+
resp: resp,
5351
}
5452
}
5553

56-
func (s *Subscription) Next() *PubSubRecord {
57-
return <-s.ch
54+
func (s *Subscription) Next() (*PubSubRecord, error) {
55+
if s.resp.Error != nil {
56+
return nil, s.resp.Error
57+
}
58+
59+
d := json.NewDecoder(s.resp.Output)
60+
61+
r := &PubSubRecord{}
62+
err := d.Decode(r)
63+
64+
return r, err
5865
}
5966

60-
func (s *Subscription) Topic() string {
61-
return s.topic
67+
func (s *Subscription) Cancel() error {
68+
return s.resp.Output.Close()
6269
}
6370

6471
///
6572

73+
/*
6674
type subscriptionHandler struct {
6775
topic string
6876
resp *Response
@@ -154,7 +162,7 @@ L:
154162
close(rdCh)
155163
}
156164
157-
sh.resp.Output.Close()
165+
//sh.resp.Output.Close()
158166
sh = nil
159167
}
160168
@@ -234,3 +242,4 @@ func (sm *subscriptionManager) dropHandler(sh *subscriptionHandler) {
234242
235243
delete(sm.subs, sh.topic)
236244
}
245+
*/

shell.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ type Shell struct {
2323
url string
2424
httpcli *gohttp.Client
2525

26-
sm *subscriptionManager
26+
//sm *subscriptionManager
2727
}
2828

2929
func NewShell(url string) *Shell {
@@ -34,7 +34,7 @@ func NewShell(url string) *Shell {
3434
}
3535

3636
s := NewShellWithClient(url, c)
37-
s.sm = newSubscriptionManager(s)
37+
//s.sm = newSubscriptionManager(s)
3838

3939
return s
4040
}
@@ -695,11 +695,15 @@ func (s *Shell) ObjectPut(obj *IpfsObject) (string, error) {
695695
}
696696

697697
func (s *Shell) PubSubSubscribe(topic string) (*Subscription, error) {
698-
return s.sm.Sub(topic)
699-
}
698+
// connect
699+
req := s.newRequest("pubsub/sub", topic)
700+
701+
resp, err := req.Send(s.httpcli)
702+
if err != nil {
703+
return nil, err
704+
}
700705

701-
func (s *Shell) PubSubCancelSubscription(sub *Subscription) {
702-
s.sm.Drop(sub)
706+
return newSubscription(resp), nil
703707
}
704708

705709
func (s *Shell) PubSubPublish(topic, data string) error {

shell_test.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,18 +123,21 @@ func TestPubSub(t *testing.T) {
123123
s := NewShell(shellUrl)
124124

125125
var (
126+
topic = "test"
127+
126128
sub *Subscription
127129
err error
128130
wait1 <-chan time.Time
129131
wait = make(chan struct{})
130132
)
131133

132134
go func() {
133-
wait1 = time.After(time.Second)
135+
wait1 = time.After(20 * time.Millisecond) // workaround for go-ipfs#3304
134136
wait <- struct{}{}
135137
t.Log("subscribing...")
136-
sub, err = s.PubSubSubscribe("test")
138+
sub, err = s.PubSubSubscribe(topic)
137139
is.Nil(err)
140+
is.NotNil(sub)
138141
t.Log("sub: done")
139142

140143
wait <- struct{}{}
@@ -144,15 +147,33 @@ func TestPubSub(t *testing.T) {
144147
<-wait1
145148

146149
t.Log("publishing...")
147-
is.Nil(s.PubSubPublish("test", "Hello World!"))
150+
is.Nil(s.PubSubPublish(topic, "Hello World!"))
148151
t.Log("pub: done")
149152

150153
<-wait
151154

152155
t.Log("next()...")
153-
r := sub.Next()
156+
r, err := sub.Next()
154157
t.Log("next: done. ")
155158

156159
is.NotNil(r)
160+
is.Nil(err)
157161
is.Equal(r.Data, "Hello World!")
162+
163+
sub2, err := s.PubSubSubscribe(topic)
164+
is.Nil(err)
165+
is.NotNil(sub2)
166+
167+
is.Nil(s.PubSubPublish(topic, "Hallo Welt!"))
168+
169+
r, err = sub2.Next() // duplicate subscription error
170+
is.NotNil(err)
171+
is.Nil(r)
172+
173+
r, err = sub.Next()
174+
is.NotNil(r)
175+
is.Nil(err)
176+
is.Equal(r.Data, "Hallo Welt!")
177+
178+
is.Nil(sub.Cancel())
158179
}

0 commit comments

Comments
 (0)