Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/Simplex/Messaging/Agent/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1731,8 +1731,10 @@ resubscribeClientService c tSess@(userId, srv, _) serviceSub =
Just SSErrorServiceId {} -> unassocSubscribeQueues $> r
_ -> pure r
Left e -> do
when (clientServiceError e) $ unassocSubscribeQueues
atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR e)
when (clientServiceError e) $ do
atomically $ SS.deleteServiceSub tSess $ currentSubs c
unassocSubscribeQueues
throwE e
where
unassocSubscribeQueues = do
Expand Down
16 changes: 13 additions & 3 deletions src/Simplex/Messaging/Agent/Store/AgentStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2354,18 +2354,28 @@ getUserServerRcvQueueSubs db userId (SMPServer h p kh) onlyNeeded hasService =
unassocUserServerRcvQueueSubs :: DB.Connection -> UserId -> SMPServer -> IO [RcvQueueSub]
unassocUserServerRcvQueueSubs db userId srv@(SMPServer h p kh) = do
deleteClientService db userId srv
#if defined(dbPostgres)
map toRcvQueueSub
<$> DB.query
db
(removeRcvAssocsQuery <> " " <> returningColums)
(removeRcvAssocsQuery <> " " <> returningColumns)
(h, p, userId, kh)
where
returningColums =
returningColumns =
[sql|
RETURNING c.user_id, rcv_queues.conn_id, rcv_queues.host, rcv_queues.port, COALESCE(rcv_queues.server_key_hash, s.key_hash),
rcv_queues.rcv_id, rcv_queues.rcv_private_key, rcv_queues.status, c.enable_ntfs, rcv_queues.client_notice_id,
rcv_queues.rcv_queue_id, rcv_queues.rcv_primary, rcv_queues.replace_rcv_queue_id
|]
#else
qs <- map toRcvQueueSub
<$> DB.query
db
(rcvQueueSubQuery <> " WHERE c.user_id = ? AND q.host = ? AND q.port = ? AND COALESCE(q.server_key_hash, s.key_hash) = ? AND q.rcv_service_assoc = 1")
(userId, h, p, kh)
DB.execute db removeRcvAssocsQuery (h, p, userId, kh)
pure qs
#endif

unassocUserServerRcvQueueSubs' :: DB.Connection -> UserId -> SMPServer -> IO ()
unassocUserServerRcvQueueSubs' db userId srv@(SMPServer h p kh) = do
Expand All @@ -2376,7 +2386,7 @@ unsetQueuesToSubscribe :: DB.Connection -> IO ()
unsetQueuesToSubscribe db = DB.execute_ db "UPDATE rcv_queues SET to_subscribe = 0 WHERE to_subscribe = 1"

setRcvServiceAssocs :: SMPQueue q => DB.Connection -> [q] -> IO ()
setRcvServiceAssocs db rqs =
setRcvServiceAssocs db rqs = do
#if defined(dbPostgres)
DB.execute db "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE rcv_id IN ?" $ Only $ In (map queueId rqs)
#else
Expand Down
2 changes: 1 addition & 1 deletion src/Simplex/Messaging/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2037,7 +2037,7 @@ client
labelMyThread $ B.unpack ("client $" <> encode sessionId) <> " deliver/SEND"
-- lookup can be outside of STM transaction,
-- as long as the check that it is the same client is inside.
getSubscribedClient rId (queueSubscribers subscribers) >>= mapM_ deliverIfSame
getSubscribed >>= mapM_ deliverIfSame
deliverIfSame rcv = do
ts <- getSystemSeconds
atomically $ whenM (sameClient rc rcv) $
Expand Down
10 changes: 6 additions & 4 deletions src/Simplex/Messaging/Server/MsgStore/Postgres.hs
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,13 @@ instance MsgStoreClass PostgresMsgStore where
q.status, q.updated_at, q.link_id, q.rcv_service_id,
m.msg_id, m.msg_ts, m.msg_quota, m.msg_ntf_flag, m.msg_body
FROM msg_queues q
LEFT JOIN (
SELECT recipient_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body,
ROW_NUMBER() OVER (PARTITION BY recipient_id ORDER BY message_id ASC) AS row_num
LEFT JOIN LATERAL (
SELECT msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body
FROM messages
) m ON q.recipient_id = m.recipient_id AND m.row_num = 1
WHERE recipient_id = q.recipient_id
ORDER BY message_id ASC
LIMIT 1
) m ON true
WHERE q.rcv_service_id = ? AND q.deleted_at IS NULL;
|]
(Only serviceId)
Expand Down
51 changes: 50 additions & 1 deletion tests/AgentTests/FunctionalAPITests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ import qualified Simplex.Messaging.Crypto.Ratchet as CR
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Notifications.Transport (NTFVersion, pattern VersionNTF)
import Simplex.Messaging.Protocol (BasicAuth, ErrorType (..), MsgBody, NetworkError (..), ProtocolServer (..), SubscriptionMode (..), initialSMPClientVersion, srvHostnamesSMPClientVersion, supportedSMPClientVRange)
import Simplex.Messaging.Protocol (BasicAuth, BrokerErrorType (..), ErrorType (..), MsgBody, NetworkError (..), ProtocolServer (..), SubscriptionMode (..), initialSMPClientVersion, srvHostnamesSMPClientVersion, supportedSMPClientVRange)
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Protocol.Types
import Simplex.Messaging.Server.Env.STM (AStoreType (..), ServerConfig (..), ServerStoreCfg (..), StorePaths (..))
Expand All @@ -110,6 +110,7 @@ import Simplex.Messaging.Server.MsgStore.Types (SMSType (..), SQSType (..))
import Simplex.Messaging.Server.QueueStore.QueueInfo
import Simplex.Messaging.Server.StoreLog (StoreLogRecord (..))
import Simplex.Messaging.Transport (ASrvTransport, SMPVersion, VersionSMP, authCmdsSMPVersion, currentServerSMPRelayVersion, minClientSMPRelayVersion, minServerSMPRelayVersion, sendingProxySMPVersion, sndAuthKeySMPVersion, alpnSupportedSMPHandshakes, supportedServerSMPRelayVRange)
import Simplex.Messaging.Transport.Server (TransportServerConfig (..))
import Simplex.Messaging.Util (bshow, diffToMicroseconds)
import Simplex.Messaging.Version (VersionRange (..))
import qualified Simplex.Messaging.Version as V
Expand Down Expand Up @@ -491,6 +492,7 @@ functionalAPITests ps = do
describe "Client service certificates" $ do
it "should connect, subscribe and reconnect as a service" $ testClientServiceConnection ps
it "should re-subscribe when service ID changed" $ testClientServiceIDChange ps
it "should clear pending service sub when service unavailable" $ testServiceUnavailableClearsPending ps
it "migrate connections to and from service" $ testMigrateConnectionsToService ps
describe "Connection switch" $ do
describe "should switch delivery to the new queue" $
Expand Down Expand Up @@ -3905,6 +3907,53 @@ testClientServiceIDChange ps@(_, ASType qs _) = do
("", "", UP _ [_]) <- nGet user
exchangeGreetingsMsgId 6 notService uId user sId

-- | Test that service subscription is correctly cleared and re-established
-- when server temporarily stops supporting services (askClientCert = False).
testServiceUnavailableClearsPending :: HasCallStack => (ASrvTransport, AStoreType) -> IO ()
testServiceUnavailableClearsPending (t, msType) = do
-- Same agent across all phases to test pendingServiceSub persistence
withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> do
-- Phase 1: Establish connection with active service subscription on normal server
(_sId, _uId) <- withSmpServerStoreLogOn (t, msType) testPort $ \_ -> runRight $ do
conns@(sId, uId) <- makeConnection service user
exchangeGreetings service uId user sId
pure conns
("", "", SERVICE_DOWN _ _) <- nGet service
("", "", DOWN _ [_]) <- nGet user
-- Phase 2: Server without service support: agent gets NO_SERVICE, queue resubscribed without service
let cfgNoService = updateCfg (cfgMS msType) $ \(cfg' :: ServerConfig s) ->
let ServerConfig {transportConfig} = cfg'
in cfg' {transportConfig = transportConfig {askClientCert = False}} :: ServerConfig s
withSmpServerConfigOn t cfgNoService testPort $ \_ -> do
("", "", ERR (BROKER _ NO_SERVICE)) <- get service
("", "", UP _ [_]) <- nGet service
("", "", UP _ [_]) <- nGet user
pure ()
("", "", DOWN _ [_]) <- nGet service
("", "", DOWN _ [_]) <- nGet user
-- Phase 3: Server with service support restored: only queue subscription, no service subscription
withSmpServerStoreLogOn (t, msType) testPort $ \_ -> do
e1 <- nGet service
case e1 of
("", "", UP _ [_]) -> pure () -- Fixed: only queue subscription, no service subscription
("", "", SERVICE_UP _ _) ->
expectationFailure "pendingServiceSub not cleared, service subscription attempted again"
("", "", SERVICE_ALL _) ->
expectationFailure "pendingServiceSub not cleared, service subscription attempted again"
other -> expectationFailure $ "Unexpected first event: " <> show other
("", "", UP _ [_]) <- nGet user
pure ()
-- Phase 4: After another reconnect cycle, service subscription is re-established
("", "", SERVICE_DOWN _ _) <- nGet service
("", "", DOWN _ [_]) <- nGet user
withSmpServerStoreLogOn (t, msType) testPort $ \_ -> do
liftIO $ getInAnyOrder service
[ \case ("", "", AEvt SAENone (SERVICE_UP _ _)) -> True; _ -> False,
\case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False
]
("", "", UP _ [_]) <- nGet user
pure ()

testMigrateConnectionsToService :: HasCallStack => (ASrvTransport, AStoreType) -> IO ()
testMigrateConnectionsToService ps = do
(((sId1, uId1), (uId2, sId2)), ((sId3, uId3), (uId4, sId4)), ((sId5, uId5), (uId6, sId6))) <-
Expand Down
12 changes: 8 additions & 4 deletions tests/ServerTests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ testServiceDeliverSubscribe =
signSend_ sh aServicePK Nothing ("11", serviceId, SUBS 1 idsHash)
[mId3] <-
fmap catMaybes $
receiveInAnyOrder -- race between SOKS and MSG, clients can handle it
receiveInAnyOrder -- race between SOKS, MSG and ALLS (sndQ and msgQ are separate threads)
sh
[ \case
Resp "11" serviceId' (SOKS n idsHash') -> do
Expand All @@ -731,9 +731,11 @@ testServiceDeliverSubscribe =
rId'' `shouldBe` rId
dec mId3 msg3 `shouldBe` Right "hello 3"
pure $ Just $ Just mId3
_ -> pure Nothing,
\case
Resp "" NoEntity ALLS -> pure $ Just Nothing
_ -> pure Nothing
]
Resp "" NoEntity ALLS <- tGet1 sh
Resp "12" _ OK <- signSendRecv sh rKey ("12", rId, ACK mId3)
Resp "14" _ OK <- signSendRecv h sKey ("14", sId, _SEND "hello 4")
Resp "" _ (Msg mId4 msg4) <- tGet1 sh
Expand Down Expand Up @@ -811,7 +813,7 @@ testServiceUpgradeAndDowngrade =
signSend_ sh aServicePK Nothing ("14", serviceId, SUBS 3 idsHash)
[(rKey3_1, rId3_1, mId3_1), (rKey3_2, rId3_2, mId3_2)] <-
fmap catMaybes $
receiveInAnyOrder -- race between SOKS and MSG, clients can handle it
receiveInAnyOrder -- race between SOKS, MSG and ALLS (sndQ and msgQ are separate threads)
sh
[ \case
Resp "14" serviceId' (SOKS n idsHash') -> do
Expand All @@ -829,9 +831,11 @@ testServiceUpgradeAndDowngrade =
Resp "" rId'' (Msg mId3 msg3) | rId'' == rId2 -> do
dec2 mId3 msg3 `shouldBe` Right "hello 3.2"
pure $ Just $ Just (rKey2, rId2, mId3)
_ -> pure Nothing,
\case
Resp "" NoEntity ALLS -> pure $ Just Nothing
_ -> pure Nothing
]
Resp "" NoEntity ALLS <- tGet1 sh
Resp "15" _ OK <- signSendRecv sh rKey3_1 ("15", rId3_1, ACK mId3_1)
Resp "16" _ OK <- signSendRecv sh rKey3_2 ("16", rId3_2, ACK mId3_2)
pure ()
Expand Down
Loading