11#include "filter_module.h"
22#include "fsort.h"
33#include "fsort_utils.h"
4+ #include "thread_pool.h"
5+ #include "client_list.h"
6+
7+ static FSortPool_t * sortPool ;
8+
9+ static blocked_list_t * fsortBlocked ;
410
511int FSortBust_RedisCommand (RedisModuleCtx * ctx , RedisModuleString * * argv , int argc ) {
612 RedisModule_AutoMemory (ctx );
@@ -26,53 +32,129 @@ int FSortBust_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int ar
2632}
2733
2834int FSortAggregate_RedisCommand (RedisModuleCtx * ctx , RedisModuleString * * argv , int argc ) {
35+ const char * cmd ;
2936 RedisModule_AutoMemory (ctx );
3037
3138 if (argc != 4 ) {
3239 return RedisModule_WrongArity (ctx );
3340 }
3441
35- pthread_t tid ;
42+ cmd = strdup (GetArgvString (ctx , argv , argc ));
43+
3644 RedisModuleBlockedClient * bc = RedisModule_BlockClient (ctx ,NULL ,NULL ,NULL ,0 );
3745
3846 void * * targ = RedisModule_Alloc (sizeof (void * )* 3 );
3947 targ [0 ] = bc ;
4048 targ [1 ] = (void * )(unsigned long ) argc ;
4149 targ [2 ] = (void * )(RedisModuleString * * )argv ;
4250
43- if (pthread_create (& tid ,NULL ,fsort_aggregate_thread ,targ ) != 0 ) {
44- RedisModule_AbortBlock (bc );
45- return RedisModule_ReplyWithError (ctx ,"-ERR Can't start thread" );
51+ //run threads
52+ if (tpool_work_exists (sortPool , cmd ) == 1 ) {
53+ blocked_list_add_client (fsortBlocked , cmd , bc , targ );
54+ } else {
55+ tpool_add_work (sortPool , fsort_aggregate_thread , targ , cmd );
4656 }
47-
57+ free ((char * )cmd );
58+
4859 return REDISMODULE_OK ;
4960}
5061
5162int FSort_RedisCommand (RedisModuleCtx * ctx , RedisModuleString * * argv , int argc ) {
63+ const char * cmd ;
64+ int pssType = 0 ;
65+ int fflType = 0 ;
66+ int longRunning = 0 ;
67+ pthread_t tid ;
68+
5269 RedisModule_AutoMemory (ctx );
5370
5471 if (argc < 7 ) {
5572 return RedisModule_WrongArity (ctx );
5673 }
5774
58- pthread_t tid ;
5975 RedisModuleBlockedClient * bc = RedisModule_BlockClient (ctx ,NULL ,NULL ,NULL ,0 );
6076
61- void * * targ = RedisModule_Alloc ( sizeof ( void * ) * 3 );
62- targ [ 0 ] = bc ;
63- targ [ 1 ] = ( void * )( unsigned long ) argc ;
64- targ [ 2 ] = ( void * )( RedisModuleString * * ) argv ;
77+ FSortObj_t * sort = fsort_new_fsort ( );
78+ RedisModuleCtx * thSafeCtx = RedisModule_GetThreadSafeContext ( bc );
79+ sort -> ctx = ctx ;
80+ int parseRes = fsort_parse_args ( sort , thSafeCtx , argv , argc ) ;
6581
66- if (pthread_create (& tid ,NULL ,fsort_fsort_thread ,targ ) != 0 ) {
67- RedisModule_AbortBlock (bc );
68- return RedisModule_ReplyWithError (ctx ,"-ERR Can't start thread" );
82+ fsort_form_keys (sort );
83+
84+ if (parseRes != REDISMODULE_OK ) {
85+ fsort_free_fsort (sort );
86+ RedisModule_UnblockClient (bc ,NULL );
87+ RedisModule_FreeThreadSafeContext (thSafeCtx );
88+ return REDISMODULE_ERR ;
6989 }
7090
91+ pssType = fsort_key_type (sort , sort -> pssKey );
92+ fflType = fsort_key_type (sort , sort -> fflKey );
93+
94+ if (pssType == REDISMODULE_KEYTYPE_EMPTY || fflType == REDISMODULE_KEYTYPE_EMPTY ) {
95+ longRunning = 1 ;
96+ //decide on which tasks block
97+ if (RedisModule_StringCompare (sort -> pssKey , sort -> fflKey ) == 0 ) {
98+ //only sort, setting command to psskey
99+ cmd = RedisModule_StringPtrLen (sort -> pssKey , NULL );
100+ } else {
101+ //if no PSS key block all commands
102+ //if no ffl block only filtering
103+ //only elseif
104+ if (pssType == REDISMODULE_KEYTYPE_EMPTY ) {
105+ cmd = RedisModule_StringPtrLen (sort -> pssKey , NULL );
106+ } else if (fflType == REDISMODULE_KEYTYPE_EMPTY ) {
107+ cmd = RedisModule_StringPtrLen (sort -> fflKey , NULL );
108+ }
109+ }
110+ cmd = strdup (cmd );
111+ }
112+
113+ void * * targ = RedisModule_Alloc (sizeof (void * )* 3 );
114+ targ [0 ] = bc ;
115+ targ [1 ] = (void * )sort ;
116+ //if all keys are present, we dont need thread pool
117+ if (longRunning == 1 ) {
118+ //run threads
119+ if (tpool_work_exists (sortPool , cmd ) == 1 ) {
120+ blocked_list_add_client (fsortBlocked , cmd , bc , targ );
121+ } else {
122+ tpool_add_work (sortPool , fsort_fsort_thread , targ , cmd );
123+ }
124+ free ((char * )cmd );
125+ } else {
126+ if (pthread_create (& tid ,NULL ,fsort_fsort_thread ,targ ) != 0 ) {
127+ RedisModule_AbortBlock (bc );
128+ RedisModule_FreeThreadSafeContext (thSafeCtx );
129+ free (targ );
130+ return RedisModule_ReplyWithError (ctx ,"-ERR Can't start thread" );
131+ }
132+ }
133+ RedisModule_FreeThreadSafeContext (thSafeCtx );
71134 return REDISMODULE_OK ;
72135}
73136
74- int RedisModule_OnLoad (RedisModuleCtx * ctx ) {
137+ int FSort_onWorkFinish (FSortWork_t * work ) {
138+ printf ("[rm]WorkFinish %s\n" , work -> cmd );
139+ blocked_clients_t * clientList = blocked_list_find_cmd (fsortBlocked , work -> cmd );
140+
141+ if (clientList != NULL ) {
142+ for (size_t i = 0 ; i < clientList -> size ; i ++ ) {
143+ blocked_client_t * bct = clientList -> clients [i ];
144+ tpool_add_work (sortPool , fsort_fsort_thread , bct -> targ , work -> cmd );
145+ }
146+ blocked_list_delete (fsortBlocked , clientList );
147+ }
75148
149+ free (work -> argv );
150+ tpool_work_destroy (work );
151+
152+ printf ("[rm]Unblock done!\n" );
153+ return 1 ;
154+ }
155+
156+ int RedisModule_OnLoad (RedisModuleCtx * ctx , RedisModuleString * * argv , int argc ) {
157+ long long poolSize = 2 ;
76158 if (RedisModule_Init (ctx , "FilterSortModule" , 1 , REDISMODULE_APIVER_1 ) == REDISMODULE_ERR ) {
77159 return REDISMODULE_ERR ;
78160 }
@@ -88,6 +170,13 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx) {
88170 if (RedisModule_CreateCommand (ctx , "fsortaggregate" , FSortAggregate_RedisCommand , "write" , 1 , 2 , 1 ) == REDISMODULE_ERR ) {
89171 return REDISMODULE_ERR ;
90172 }
173+
174+ if (argc == 1 ) {
175+ RedisModule_StringToLongLong (argv [0 ], & poolSize );
176+ }
91177
178+ sortPool = tpool_create (poolSize );
179+ sortPool -> cb = FSort_onWorkFinish ;
180+ fsortBlocked = blocked_list_create ("fsort-f" );
92181 return REDISMODULE_OK ;
93182}
0 commit comments