@@ -39,19 +39,23 @@ type PubSubRecord struct {
3939 TopicIDs []string `json:"topicIDs"`
4040}
4141
42+ func (r PubSubRecord ) DataString () string {
43+ return string (r .Data )
44+ }
45+
4246///
4347
44- type Subscription struct {
48+ type PubSubSubscription struct {
4549 resp * Response
4650}
4751
48- func newSubscription (resp * Response ) * Subscription {
49- return & Subscription {
52+ func newPubSubSubscription (resp * Response ) * PubSubSubscription {
53+ return & PubSubSubscription {
5054 resp : resp ,
5155 }
5256}
5357
54- func (s * Subscription ) Next () (* PubSubRecord , error ) {
58+ func (s * PubSubSubscription ) Next () (* PubSubRecord , error ) {
5559 if s .resp .Error != nil {
5660 return nil , s .resp .Error
5761 }
@@ -64,7 +68,11 @@ func (s *Subscription) Next() (*PubSubRecord, error) {
6468 return r , err
6569}
6670
67- func (s * Subscription ) Cancel () error {
71+ func (s * PubSubSubscription ) Cancel () error {
72+ if s .resp .Output == nil {
73+ return nil
74+ }
75+
6876 return s .resp .Output .Close ()
6977}
7078
@@ -83,7 +91,7 @@ type subscriptionHandler struct {
8391 failReason error
8492}
8593
86- func newSubscriptionHandler (topic string, resp *Response) *subscriptionHandler {
94+ func newPubSubSubscriptionHandler (topic string, resp *Response) *subscriptionHandler {
8795 sh := &subscriptionHandler{
8896 // the topic that is being handled
8997 topic: topic,
@@ -170,15 +178,15 @@ func (sh *subscriptionHandler) Stop() {
170178 sh.stop <- struct{}{}
171179}
172180
173- func (sh *subscriptionHandler) Sub() *Subscription {
181+ func (sh *subscriptionHandler) Sub() *PubSubSubscription {
174182 ch := make(chan *PubSubRecord)
175183
176184 sh.add <- ch
177185
178- return newSubscription (sh.topic, ch)
186+ return newPubSubSubscription (sh.topic, ch)
179187}
180188
181- func (sh *subscriptionHandler) Drop(s *Subscription ) {
189+ func (sh *subscriptionHandler) Drop(s *PubSubSubscription ) {
182190 sh.drop <- s.ch
183191}
184192
@@ -195,14 +203,14 @@ type subscriptionManager struct {
195203 subs map[string]*subscriptionHandler
196204}
197205
198- func newSubscriptionManager (s *Shell) *subscriptionManager {
206+ func newPubSubSubscriptionManager (s *Shell) *subscriptionManager {
199207 return &subscriptionManager{
200208 s: s,
201209 subs: make(map[string]*subscriptionHandler),
202210 }
203211}
204212
205- func (sm *subscriptionManager) Sub(topic string) (*Subscription , error) {
213+ func (sm *subscriptionManager) Sub(topic string) (*PubSubSubscription , error) {
206214 // lock
207215 sm.Lock()
208216 defer sm.Unlock()
@@ -218,15 +226,15 @@ func (sm *subscriptionManager) Sub(topic string) (*Subscription, error) {
218226 }
219227
220228 // pass connection to handler and add handler to manager
221- sh = newSubscriptionHandler (topic, resp)
229+ sh = newPubSubSubscriptionHandler (topic, resp)
222230 sm.subs[topic] = sh
223231 }
224232
225233 // success
226234 return sh.Sub(), nil
227235}
228236
229- func (sm *subscriptionManager) Drop(s *Subscription ) {
237+ func (sm *subscriptionManager) Drop(s *PubSubSubscription ) {
230238 sm.Lock()
231239 defer sm.Unlock()
232240
0 commit comments