Skip to content

Commit 95b0a89

Browse files
authored
Merge pull request #12 from rinocs/fix/fixed-concurrency-on-map
fix: fixed concurrency on basic map, moved to sync.Map on telegram fe…
2 parents 903a6ef + 5add01b commit 95b0a89

File tree

1 file changed

+42
-18
lines changed

1 file changed

+42
-18
lines changed

feeders/telegram.go

Lines changed: 42 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"os"
77
"path/filepath"
88
"strconv"
9+
"sync"
910
"time"
1011

1112
"github.com/Matrix86/driplane/data"
@@ -28,9 +29,9 @@ type Telegram struct {
2829
context context.Context
2930
cancelContext context.CancelFunc
3031

31-
userMap map[int64]*tg.User
32-
channelMap map[int64]*tg.Channel
33-
chatMap map[int64]*tg.Chat
32+
userMap sync.Map
33+
channelMap sync.Map
34+
chatMap sync.Map
3435

3536
api *tg.Client
3637
}
@@ -41,9 +42,9 @@ func NewTelegramFeeder(conf map[string]string) (Feeder, error) {
4142
t := &Telegram{
4243
context: context,
4344
cancelContext: cancel,
44-
userMap: make(map[int64]*tg.User),
45-
channelMap: make(map[int64]*tg.Channel),
46-
chatMap: make(map[int64]*tg.Chat),
45+
userMap: sync.Map{},
46+
channelMap: sync.Map{},
47+
chatMap: sync.Map{},
4748
}
4849

4950
if val, ok := conf["telegram.phone_number"]; ok {
@@ -77,36 +78,45 @@ func NewTelegramFeeder(conf map[string]string) (Feeder, error) {
7778

7879
func (t *Telegram) updateMaps(e tg.Entities) {
7980
for userID, user := range e.Users {
80-
t.userMap[userID] = user
81+
t.userMap.Store(userID, user)
8182
}
8283

8384
for channelID, channel := range e.Channels {
84-
t.channelMap[channelID] = channel
85+
t.channelMap.Store(channelID, channel)
8586
}
8687

8788
for chatID, chat := range e.Chats {
88-
t.chatMap[chatID] = chat
89+
t.chatMap.Store(chatID, chat)
8990
}
9091
}
9192

9293
func (t *Telegram) getUserByID(id int64) (*tg.User, error) {
93-
if user, ok := t.userMap[id]; ok {
94+
value, ok := t.userMap.Load(id)
95+
if ok {
96+
user := value.(*tg.User)
9497
return user, nil
9598
}
99+
96100
return nil, fmt.Errorf("user not found")
97101
}
98102

99103
func (t *Telegram) getChannelByID(id int64) (*tg.Channel, error) {
100-
if channel, ok := t.channelMap[id]; ok {
104+
value, ok := t.channelMap.Load(id)
105+
if ok {
106+
channel := value.(*tg.Channel)
101107
return channel, nil
102108
}
109+
103110
return nil, fmt.Errorf("channel not found")
104111
}
105112

106113
func (t *Telegram) getChatByID(id int64) (*tg.Chat, error) {
107-
if chat, ok := t.chatMap[id]; ok {
114+
value, ok := t.chatMap.Load(id)
115+
if ok {
116+
chat := value.(*tg.Chat)
108117
return chat, nil
109118
}
119+
110120
return nil, fmt.Errorf("chat not found")
111121
}
112122

@@ -436,9 +446,9 @@ func (t *Telegram) getDialogs() {
436446
for _, chat := range dialogs.Chats {
437447
switch c := chat.(type) {
438448
case *tg.Chat:
439-
t.chatMap[c.ID] = c
449+
t.chatMap.Store(c.ID, c)
440450
case *tg.Channel:
441-
t.channelMap[c.ID] = c
451+
t.channelMap.Store(c.ID, c)
442452
}
443453
}
444454
}
@@ -579,12 +589,26 @@ func (t *Telegram) Start() {
579589
log.Info("Telegram: update recovery initialized and started, listening for events")
580590
// initializing the maps and printing on debug mode
581591
t.getDialogs()
582-
for _, c := range t.channelMap {
592+
t.channelMap.Range(func(key, value interface{}) bool {
593+
c, ok := value.(*tg.Channel)
594+
if !ok {
595+
log.Error("Telegram: not able to get channel from the Map")
596+
return true
597+
}
583598
log.Debug("Telegram: Channel: ID=%d AccessHash=%d Username=%s Title=%s ", c.ID, c.AccessHash, c.Username, c.Title)
584-
}
585-
for _, c := range t.chatMap {
599+
return true
600+
})
601+
602+
t.chatMap.Range(func(key, value interface{}) bool {
603+
c, ok := value.(*tg.Chat)
604+
if !ok {
605+
log.Error("Telegram: not able to get chat from the Map")
606+
return true
607+
}
586608
log.Debug("Telegram: Chat: ID=%d Title=%s", c.ID, c.Title)
587-
}
609+
610+
return true
611+
})
588612
},
589613
})
590614
}); err != nil {

0 commit comments

Comments
 (0)