Skip to content

Commit 3c7484a

Browse files
committed
Merge pull request #129 from Anislav/feature/refactor-notification-feed
New notification feed implementation
2 parents ec966c9 + 595ab35 commit 3c7484a

File tree

15 files changed

+974
-61
lines changed

15 files changed

+974
-61
lines changed

stream_framework/activity.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,3 +415,14 @@ def __repr__(self):
415415
message = 'AggregatedActivity(%s-%s) Actors %s: Objects %s' % (
416416
self.group, ','.join(verbs), actors, object_ids)
417417
return message
418+
419+
420+
class NotificationActivity(AggregatedActivity):
421+
422+
def __init__(self, *args, **kwargs):
423+
AggregatedActivity.__init__(self, *args, **kwargs)
424+
425+
# overrides AggregatedActivity is_read & is_seen instance methods
426+
self.is_seen = False
427+
self.is_read = False
428+

stream_framework/aggregators/base.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,10 @@ def rank(self, aggregated_activities):
125125
raise ValueError('not implemented')
126126

127127

128-
class RecentVerbAggregator(BaseAggregator):
128+
class RecentRankMixin(object):
129129

130130
'''
131-
Aggregates based on the same verb and same time period
131+
Most recently updated aggregated activities are ranked first.
132132
'''
133133

134134
def rank(self, aggregated_activities):
@@ -138,6 +138,13 @@ def rank(self, aggregated_activities):
138138
aggregated_activities.sort(key=lambda a: a.updated_at, reverse=True)
139139
return aggregated_activities
140140

141+
142+
class RecentVerbAggregator(RecentRankMixin, BaseAggregator):
143+
144+
'''
145+
Aggregates based on the same verb and same time period
146+
'''
147+
141148
def get_group(self, activity):
142149
'''
143150
Returns a group based on the day and verb
@@ -146,3 +153,20 @@ def get_group(self, activity):
146153
date = activity.time.date()
147154
group = '%s-%s' % (verb, date)
148155
return group
156+
157+
158+
class NotificationAggregator(RecentRankMixin, BaseAggregator):
159+
160+
'''
161+
Aggregates based on the same verb, object and day
162+
'''
163+
164+
def get_group(self, activity):
165+
'''
166+
Returns a group based on the verb, object and day
167+
'''
168+
verb = activity.verb.id
169+
object_id = activity.object_id
170+
date = activity.time.date()
171+
group = '%s-%s-%s' % (verb, object_id, date)
172+
return group

stream_framework/feeds/aggregated_feed/notification_feed.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,20 @@
55
import copy
66
import json
77
import logging
8+
import warnings
89

910
logger = logging.getLogger(__name__)
1011

12+
MODULE_IS_DEPRECATED = """
13+
Module stream_framework.feeds.aggregated_feed.notification_feed is deprecated.
14+
Please use stream_framework.feeds.notification_feed module.
15+
16+
Class stream_framework.feeds.aggregated_feed.notification_feed.RedisNotificationFeed
17+
is replaced by stream_framework.feeds.notification_feed.redis.RedisNotificationFeed
18+
"""
19+
20+
warnings.warn(MODULE_IS_DEPRECATED, DeprecationWarning)
21+
1122

1223
class NotificationFeed(AggregatedFeed):
1324

stream_framework/feeds/notification_feed/__init__.py

Whitespace-only changes.
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
from stream_framework.activity import NotificationActivity
2+
from stream_framework.aggregators.base import NotificationAggregator
3+
from stream_framework.feeds.aggregated_feed.base import AggregatedFeed
4+
from stream_framework.serializers.aggregated_activity_serializer import NotificationSerializer
5+
from stream_framework.storage.base_lists_storage import BaseListsStorage
6+
7+
import logging
8+
logger = logging.getLogger(__name__)
9+
10+
11+
class BaseNotificationFeed(AggregatedFeed):
12+
'''
13+
Similar to an aggregated feed, but:
14+
- does not use the activity storage (serializes everything into the timeline storage)
15+
- tracks unseen/unread aggregated activities
16+
- enables counting of unseen/unread aggregated activities
17+
- enables marking of unseen/unread aggregated activities as seen/read
18+
'''
19+
20+
key_format = 'notification_feed:%(user_id)s'
21+
22+
timeline_serializer = NotificationSerializer
23+
aggregator_class = NotificationAggregator
24+
aggregated_activity_class = NotificationActivity
25+
activity_storage_class = None
26+
activity_serializer = None
27+
28+
# : the storage class responsible to keep track of unseen/unread activity ids
29+
markers_storage_class = BaseListsStorage
30+
31+
# : define whether or not to keep track of unseen activity ids
32+
track_unseen = True
33+
# : define whether or not to keep track of unread activity ids
34+
track_unread = True
35+
36+
# : the max number of tracked unseen/unread activity ids
37+
markers_max_length = 100
38+
39+
# : provides a part of the key used by markers_storage_class
40+
markers_key_format = 'notification_feed:%(user_id)s'
41+
42+
#: the key used for distributed locking
43+
lock_format = 'notification_feed:%(user_id)s:lock'
44+
45+
def __init__(self, user_id, **kwargs):
46+
super(BaseNotificationFeed, self).__init__(user_id, **kwargs)
47+
48+
if self.markers_storage_class is None:
49+
if self.track_unread or self.track_unseen:
50+
raise ValueError('markers_storage_class must be set in case the unseen/unread activities are tracked')
51+
else:
52+
if not issubclass(self.markers_storage_class, BaseListsStorage):
53+
error_format = 'markers_storage_class attribute must be subclass of %s, encountered class %s'
54+
message = error_format % (BaseListsStorage, self.markers_storage_class)
55+
raise ValueError(message)
56+
57+
markers_key = self.markers_key_format % {'user_id': user_id}
58+
self.feed_markers = self.markers_storage_class(key=markers_key,
59+
max_length=self.markers_max_length)
60+
61+
def count_unseen(self):
62+
'''
63+
Counts the number of aggregated activities which are unseen.
64+
'''
65+
if self.track_unseen:
66+
return self.feed_markers.count('unseen')
67+
68+
def count_unread(self):
69+
'''
70+
Counts the number of aggregated activities which are unread.
71+
'''
72+
if self.track_unread:
73+
return self.feed_markers.count('unread')
74+
75+
def get_notification_data(self):
76+
'''
77+
Provides custom notification data that is used by the transport layer
78+
when the feed is updated.
79+
'''
80+
notification_data = dict()
81+
82+
if self.track_unseen and self.track_unread:
83+
unseen_count, unread_count = self.feed_markers.count('unseen', 'unread')
84+
notification_data['unseen_count'] = unseen_count
85+
notification_data['unread_count'] = unread_count
86+
elif self.track_unseen:
87+
unseen_count = self.feed_markers.count('unseen')
88+
notification_data['unseen_count'] = unseen_count
89+
elif self.track_unread:
90+
unread_count = self.feed_markers.count('unread')
91+
notification_data['unread_count'] = unread_count
92+
93+
return notification_data
94+
95+
def update_markers(self, unseen_ids=None, unread_ids=None, operation='add'):
96+
'''
97+
Starts or stops tracking aggregated activities as unseen and/or unread.
98+
'''
99+
if self.markers_storage_class is not None:
100+
if operation not in ('add', 'remove'):
101+
raise TypeError('%s is not supported' % operation)
102+
103+
kwargs = dict()
104+
if unseen_ids is not None and self.track_unseen:
105+
kwargs['unseen'] = unseen_ids
106+
if unread_ids is not None and self.track_unread:
107+
kwargs['unread'] = unread_ids
108+
109+
func = getattr(self.feed_markers, operation)
110+
func(**kwargs)
111+
112+
# TODO use a real-time transport layer to notify for these updates
113+
114+
def get_activity_slice(self, start=None, stop=None, rehydrate=True):
115+
'''
116+
Retrieves a slice of aggregated activities and annotates them as read and/or seen.
117+
'''
118+
activities = super(BaseNotificationFeed, self).get_activity_slice(start, stop, rehydrate)
119+
if activities and self.markers_storage_class is not None:
120+
121+
if self.track_unseen and self.track_unread:
122+
unseen_ids, unread_ids = self.feed_markers.get('unseen', 'unread')
123+
elif self.track_unseen:
124+
unseen_ids = self.feed_markers.get('unseen')
125+
elif self.track_unread:
126+
unread_ids = self.feed_markers.get('unread')
127+
128+
for activity in activities:
129+
if self.track_unseen:
130+
activity.is_seen = activity.serialization_id not in unseen_ids
131+
if self.track_unread:
132+
activity.is_read = activity.serialization_id not in unread_ids
133+
134+
return activities
135+
136+
def add_many_aggregated(self, aggregated, *args, **kwargs):
137+
'''
138+
Adds the activities to the notification feed and marks them as unread/unseen.
139+
'''
140+
super(BaseNotificationFeed, self).add_many_aggregated(aggregated, *args, **kwargs)
141+
ids = [a.serialization_id for a in aggregated]
142+
self.update_markers(ids, ids, operation='add')
143+
144+
def remove_many_aggregated(self, aggregated, *args, **kwargs):
145+
'''
146+
Removes the activities from the notification feed and marks them as read/seen.
147+
'''
148+
super(BaseNotificationFeed, self).remove_many_aggregated(aggregated, *args, **kwargs)
149+
ids = [a.serialization_id for a in aggregated]
150+
self.update_markers(ids, ids, operation='remove')
151+
152+
def mark_activity(self, activity_id, seen=True, read=False):
153+
'''
154+
Marks the given aggregated activity as seen or read or both.
155+
'''
156+
self.mark_activities([activity_id], seen, read)
157+
158+
def mark_activities(self, activity_ids, seen=True, read=False):
159+
'''
160+
Marks all of the given aggregated activities as seen or read or both.
161+
'''
162+
unseen_ids = activity_ids if seen else []
163+
unread_ids = activity_ids if read else []
164+
self.update_markers(unseen_ids=unseen_ids,
165+
unread_ids=unread_ids,
166+
operation='remove')
167+
168+
def mark_all(self, seen=True, read=False):
169+
'''
170+
Marks all of the feed's aggregated activities as seen or read or both.
171+
'''
172+
args = []
173+
if seen and self.track_unseen:
174+
args.append('unseen')
175+
if read and self.track_unread:
176+
args.append('unread')
177+
self.feed_markers.flush(*args)
178+
179+
def delete(self):
180+
'''
181+
Deletes the feed and its markers.
182+
'''
183+
super(BaseNotificationFeed, self).delete()
184+
185+
args = []
186+
if self.track_unseen:
187+
args.append('unseen')
188+
if self.track_unread:
189+
args.append('unread')
190+
self.feed_markers.flush(*args)
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from stream_framework.feeds.notification_feed.base import BaseNotificationFeed
2+
from stream_framework.storage.redis.lists_storage import RedisListsStorage
3+
from stream_framework.storage.redis.timeline_storage import RedisTimelineStorage
4+
5+
6+
class RedisNotificationFeed(BaseNotificationFeed):
7+
8+
markers_storage_class = RedisListsStorage
9+
timeline_storage_class = RedisTimelineStorage

0 commit comments

Comments
 (0)