diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 9e637ca96..8483bfebf 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -1633,11 +1633,11 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do Left e -> do atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR e) if clientServiceError e - then unassocQueues + then False <$ withStore' c (\db -> unassocUserServerRcvQueueSubs' db userId srv) else pure True where unassocQueues :: AM Bool - unassocQueues = False <$ withStore' c (\db -> unassocUserServerRcvQueueSubs' db userId srv) + unassocQueues = False <$ withStore' c (\db -> removeRcvServiceAssocs db userId srv) _ -> pure False subscribeUserServer :: Int -> TVar Int -> ((UserId, SMPServer), ServiceAssoc) -> AM' (Either AgentErrorType Int) subscribeUserServer maxPending currPending ((userId, srv), hasService) = do diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 46a441aaf..a3178b6b4 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -1726,20 +1726,33 @@ processClientNotices c@AgentClient {presetServers} tSess notices = do resubscribeClientService :: AgentClient -> SMPTransportSession -> ServiceSub -> AM ServiceSubResult resubscribeClientService c tSess@(userId, srv, _) serviceSub = - tryAllErrors (withServiceClient c tSess $ \smp _ -> subscribeClientService_ c True tSess smp serviceSub) >>= \case + tryAllErrors (withServiceClient c tSess subscribeOrUpdate) >>= \case Right r@(ServiceSubResult e _) -> case e of - Just SSErrorServiceId {} -> unassocSubscribeQueues $> r + Just SSErrorServiceId {} -> + r <$ withStore' c (\db -> removeRcvServiceAssocs db userId srv) _ -> pure r Left e -> do - when (clientServiceError e) $ unassocSubscribeQueues atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR e) + when (clientServiceError e) $ do + atomically $ SS.deleteServiceSub tSess $ currentSubs c + unassocSubscribeQueues throwE e where + subscribeOrUpdate smp connServiceId + | connServiceId == SMP.smpServiceId serviceSub = + subscribeClientService_ c True tSess smp serviceSub + | otherwise = do + let newServiceSub = SMP.ServiceSub connServiceId 0 mempty + sessId = sessionId $ thParams smp + r = serviceSubResult serviceSub newServiceSub + atomically $ whenM (activeClientSession c tSess sessId) $ + SS.setActiveServiceSub tSess sessId newServiceSub $ currentSubs c + notifySub c $ SERVICE_UP srv r + pure r unassocSubscribeQueues = do qs <- withStore' c $ \db -> unassocUserServerRcvQueueSubs db userId srv void $ lift $ subscribeUserServerQueues c userId srv qs --- TODO [certs rcv] update service in the database if it has different ID and re-associate queues, and send event subscribeClientService :: AgentClient -> Bool -> UserId -> SMPServer -> ServiceSub -> AM ServiceSubResult subscribeClientService c withEvent userId srv (ServiceSub _ n idsHash) = withServiceClient c tSess $ \smp smpServiceId -> do diff --git a/src/Simplex/Messaging/Agent/Store/AgentStore.hs b/src/Simplex/Messaging/Agent/Store/AgentStore.hs index 32720dd85..0853bf626 100644 --- a/src/Simplex/Messaging/Agent/Store/AgentStore.hs +++ b/src/Simplex/Messaging/Agent/Store/AgentStore.hs @@ -2354,18 +2354,28 @@ getUserServerRcvQueueSubs db userId (SMPServer h p kh) onlyNeeded hasService = unassocUserServerRcvQueueSubs :: DB.Connection -> UserId -> SMPServer -> IO [RcvQueueSub] unassocUserServerRcvQueueSubs db userId srv@(SMPServer h p kh) = do deleteClientService db userId srv +#if defined(dbPostgres) map toRcvQueueSub <$> DB.query db - (removeRcvAssocsQuery <> " " <> returningColums) + (removeRcvAssocsQuery <> " " <> returningColumns) (h, p, userId, kh) where - returningColums = + returningColumns = [sql| RETURNING c.user_id, rcv_queues.conn_id, rcv_queues.host, rcv_queues.port, COALESCE(rcv_queues.server_key_hash, s.key_hash), rcv_queues.rcv_id, rcv_queues.rcv_private_key, rcv_queues.status, c.enable_ntfs, rcv_queues.client_notice_id, rcv_queues.rcv_queue_id, rcv_queues.rcv_primary, rcv_queues.replace_rcv_queue_id |] +#else + qs <- map toRcvQueueSub + <$> DB.query + db + (rcvQueueSubQuery <> " WHERE c.user_id = ? AND q.host = ? AND q.port = ? AND COALESCE(q.server_key_hash, s.key_hash) = ? AND q.rcv_service_assoc = 1") + (userId, h, p, kh) + DB.execute db removeRcvAssocsQuery (h, p, userId, kh) + pure qs +#endif unassocUserServerRcvQueueSubs' :: DB.Connection -> UserId -> SMPServer -> IO () unassocUserServerRcvQueueSubs' db userId srv@(SMPServer h p kh) = do @@ -2376,7 +2386,7 @@ unsetQueuesToSubscribe :: DB.Connection -> IO () unsetQueuesToSubscribe db = DB.execute_ db "UPDATE rcv_queues SET to_subscribe = 0 WHERE to_subscribe = 1" setRcvServiceAssocs :: SMPQueue q => DB.Connection -> [q] -> IO () -setRcvServiceAssocs db rqs = +setRcvServiceAssocs db rqs = do #if defined(dbPostgres) DB.execute db "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE rcv_id IN ?" $ Only $ In (map queueId rqs) #else diff --git a/src/Simplex/Messaging/Notifications/Server/Store.hs b/src/Simplex/Messaging/Notifications/Server/Store.hs index 0486978ec..294fe0498 100644 --- a/src/Simplex/Messaging/Notifications/Server/Store.hs +++ b/src/Simplex/Messaging/Notifications/Server/Store.hs @@ -26,7 +26,6 @@ module Simplex.Messaging.Notifications.Server.Store stmAddNtfSubscription, stmDeleteNtfSubscription, stmStoreTokenLastNtf, - stmSetNtfService, ) where @@ -205,9 +204,6 @@ stmStoreTokenLastNtf (NtfSTMStore {tokens, tokenLastNtfs}) tknId ntf = do whenM (TM.member tknId tokens) $ TM.insertM tknId (newTVar [ntf]) tokenLastNtfs -stmSetNtfService :: NtfSTMStore -> SMPServer -> Maybe ServiceId -> STM () -stmSetNtfService (NtfSTMStore {ntfServices}) srv serviceId = - maybe (TM.delete srv) (TM.insert srv) serviceId ntfServices data TokenNtfMessageRecord = TNMRv1 NtfTokenId PNMessageData diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 3d977dc8c..e50416af6 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -2037,7 +2037,7 @@ client labelMyThread $ B.unpack ("client $" <> encode sessionId) <> " deliver/SEND" -- lookup can be outside of STM transaction, -- as long as the check that it is the same client is inside. - getSubscribedClient rId (queueSubscribers subscribers) >>= mapM_ deliverIfSame + getSubscribed >>= mapM_ deliverIfSame deliverIfSame rcv = do ts <- getSystemSeconds atomically $ whenM (sameClient rc rcv) $ diff --git a/src/Simplex/Messaging/Server/MsgStore/Postgres.hs b/src/Simplex/Messaging/Server/MsgStore/Postgres.hs index edf7f481c..77d9973e6 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Postgres.hs @@ -131,11 +131,13 @@ instance MsgStoreClass PostgresMsgStore where q.status, q.updated_at, q.link_id, q.rcv_service_id, m.msg_id, m.msg_ts, m.msg_quota, m.msg_ntf_flag, m.msg_body FROM msg_queues q - LEFT JOIN ( - SELECT recipient_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body, - ROW_NUMBER() OVER (PARTITION BY recipient_id ORDER BY message_id ASC) AS row_num + LEFT JOIN LATERAL ( + SELECT msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body FROM messages - ) m ON q.recipient_id = m.recipient_id AND m.row_num = 1 + WHERE recipient_id = q.recipient_id + ORDER BY message_id ASC + LIMIT 1 + ) m ON true WHERE q.rcv_service_id = ? AND q.deleted_at IS NULL; |] (Only serviceId) diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index b824b61c3..b44280316 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -110,6 +110,7 @@ import Simplex.Messaging.Server.MsgStore.Types (SMSType (..), SQSType (..)) import Simplex.Messaging.Server.QueueStore.QueueInfo import Simplex.Messaging.Server.StoreLog (StoreLogRecord (..)) import Simplex.Messaging.Transport (ASrvTransport, SMPVersion, VersionSMP, authCmdsSMPVersion, currentServerSMPRelayVersion, minClientSMPRelayVersion, minServerSMPRelayVersion, sendingProxySMPVersion, sndAuthKeySMPVersion, alpnSupportedSMPHandshakes, supportedServerSMPRelayVRange) +import Simplex.Messaging.Transport.Server (TransportServerConfig (..)) import Simplex.Messaging.Util (bshow, diffToMicroseconds) import Simplex.Messaging.Version (VersionRange (..)) import qualified Simplex.Messaging.Version as V @@ -491,6 +492,8 @@ functionalAPITests ps = do describe "Client service certificates" $ do it "should connect, subscribe and reconnect as a service" $ testClientServiceConnection ps it "should re-subscribe when service ID changed" $ testClientServiceIDChange ps + it "should clear pending service sub when service unavailable" $ testServiceUnavailableClearsPending ps + it "should recover when service ID changes on reconnect" $ testServiceIdChangeOnReconnect ps it "migrate connections to and from service" $ testMigrateConnectionsToService ps describe "Connection switch" $ do describe "should switch delivery to the new queue" $ @@ -3905,6 +3908,95 @@ testClientServiceIDChange ps@(_, ASType qs _) = do ("", "", UP _ [_]) <- nGet user exchangeGreetingsMsgId 6 notService uId user sId +-- | Test that service subscription is correctly cleared and re-established +-- when server temporarily stops supporting services (askClientCert = False). +testServiceUnavailableClearsPending :: HasCallStack => (ASrvTransport, AStoreType) -> IO () +testServiceUnavailableClearsPending (t, msType) = do + -- Same agent across all phases to test pendingServiceSub persistence + withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> do + -- Phase 1: Establish connection with active service subscription on normal server + (_sId, _uId) <- withSmpServerStoreLogOn (t, msType) testPort $ \_ -> runRight $ do + conns@(sId, uId) <- makeConnection service user + exchangeGreetings service uId user sId + pure conns + ("", "", SERVICE_DOWN _ _) <- nGet service + ("", "", DOWN _ [_]) <- nGet user + -- Phase 2: Server without service support: agent gets NO_SERVICE, queue resubscribed without service + let cfgNoService = updateCfg (cfgMS msType) $ \(cfg' :: ServerConfig s) -> + let ServerConfig {transportConfig} = cfg' + in cfg' {transportConfig = transportConfig {askClientCert = False}} :: ServerConfig s + withSmpServerConfigOn t cfgNoService testPort $ \_ -> do + ("", "", ERR (BROKER _ NO_SERVICE)) <- get service + ("", "", UP _ [_]) <- nGet service + ("", "", UP _ [_]) <- nGet user + pure () + ("", "", DOWN _ [_]) <- nGet service + ("", "", DOWN _ [_]) <- nGet user + -- Phase 3: Server with service support restored: only queue subscription, no service subscription + withSmpServerStoreLogOn (t, msType) testPort $ \_ -> do + e1 <- nGet service + case e1 of + ("", "", UP _ [_]) -> pure () -- Fixed: only queue subscription, no service subscription + ("", "", SERVICE_UP _ _) -> + expectationFailure "pendingServiceSub not cleared, service subscription attempted again" + ("", "", SERVICE_ALL _) -> + expectationFailure "pendingServiceSub not cleared, service subscription attempted again" + other -> expectationFailure $ "Unexpected first event: " <> show other + ("", "", UP _ [_]) <- nGet user + pure () + -- Phase 4: After another reconnect cycle, service subscription is re-established + ("", "", SERVICE_DOWN _ _) <- nGet service + ("", "", DOWN _ [_]) <- nGet user + withSmpServerStoreLogOn (t, msType) testPort $ \_ -> do + liftIO $ getInAnyOrder service + [ \case ("", "", AEvt SAENone (SERVICE_UP _ _)) -> True; _ -> False, + \case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False + ] + ("", "", UP _ [_]) <- nGet user + pure () + +-- | Test that service subscription recovers when service ID changes on reconnect. +-- Server restart with deleted service causes new service ID, triggering SSErrorServiceId. +-- Queues should be unassociated, resubscribed, and re-associated with new service. +testServiceIdChangeOnReconnect :: HasCallStack => (ASrvTransport, AStoreType) -> IO () +testServiceIdChangeOnReconnect ps@(_, ASType qs _) = do + withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> do + -- Phase 1: Establish connection with active service subscription + (_sId, _uId) <- withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do + conns@(sId, uId) <- makeConnection service user + exchangeGreetings service uId user sId + pure conns + ("", "", SERVICE_DOWN _ _) <- nGet service + ("", "", DOWN _ [_]) <- nGet user + -- Delete service from server storage, keeping queues + _ :: () <- case qs of + SQSPostgres -> do +#if defined(dbServerPostgres) + st <- either (error . show) pure =<< Postgres.createDBStore testStoreDBOpts serverMigrations (MigrationConfig MCError Nothing) + void $ Postgres.withTransaction st (`PSQL.execute_` "DELETE FROM services") +#else + pure () +#endif + SQSMemory -> do + s <- readFile testStoreLogFile + removeFile testStoreLogFile + writeFile testStoreLogFile $ unlines $ filter (not . ("NEW_SERVICE" `isPrefixOf`)) $ lines s + -- Phase 2: Server restart with deleted service - new service ID, SSErrorServiceId + withSmpServerStoreLogOn ps testPort $ \_ -> do + ("", "", SERVICE_UP _ _) <- nGet service + ("", "", UP _ [_]) <- nGet user + pure () + -- Phase 3: Normal reconnect - service should subscribe normally + ("", "", SERVICE_DOWN _ _) <- nGet service + ("", "", DOWN _ [_]) <- nGet user + withSmpServerStoreLogOn ps testPort $ \_ -> do + liftIO $ getInAnyOrder service + [ \case ("", "", AEvt SAENone (SERVICE_UP _ _)) -> True; _ -> False, + \case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False + ] + ("", "", UP _ [_]) <- nGet user + pure () + testMigrateConnectionsToService :: HasCallStack => (ASrvTransport, AStoreType) -> IO () testMigrateConnectionsToService ps = do (((sId1, uId1), (uId2, sId2)), ((sId3, uId3), (uId4, sId4)), ((sId5, uId5), (uId6, sId6))) <- diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index 27a72d2ac..deace417e 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -717,7 +717,7 @@ testServiceDeliverSubscribe = signSend_ sh aServicePK Nothing ("11", serviceId, SUBS 1 idsHash) [mId3] <- fmap catMaybes $ - receiveInAnyOrder -- race between SOKS and MSG, clients can handle it + receiveInAnyOrder -- race between SOKS, MSG and ALLS (sndQ and msgQ are separate threads) sh [ \case Resp "11" serviceId' (SOKS n idsHash') -> do @@ -731,9 +731,11 @@ testServiceDeliverSubscribe = rId'' `shouldBe` rId dec mId3 msg3 `shouldBe` Right "hello 3" pure $ Just $ Just mId3 + _ -> pure Nothing, + \case + Resp "" NoEntity ALLS -> pure $ Just Nothing _ -> pure Nothing ] - Resp "" NoEntity ALLS <- tGet1 sh Resp "12" _ OK <- signSendRecv sh rKey ("12", rId, ACK mId3) Resp "14" _ OK <- signSendRecv h sKey ("14", sId, _SEND "hello 4") Resp "" _ (Msg mId4 msg4) <- tGet1 sh @@ -811,7 +813,7 @@ testServiceUpgradeAndDowngrade = signSend_ sh aServicePK Nothing ("14", serviceId, SUBS 3 idsHash) [(rKey3_1, rId3_1, mId3_1), (rKey3_2, rId3_2, mId3_2)] <- fmap catMaybes $ - receiveInAnyOrder -- race between SOKS and MSG, clients can handle it + receiveInAnyOrder -- race between SOKS, MSG and ALLS (sndQ and msgQ are separate threads) sh [ \case Resp "14" serviceId' (SOKS n idsHash') -> do @@ -829,9 +831,11 @@ testServiceUpgradeAndDowngrade = Resp "" rId'' (Msg mId3 msg3) | rId'' == rId2 -> do dec2 mId3 msg3 `shouldBe` Right "hello 3.2" pure $ Just $ Just (rKey2, rId2, mId3) + _ -> pure Nothing, + \case + Resp "" NoEntity ALLS -> pure $ Just Nothing _ -> pure Nothing ] - Resp "" NoEntity ALLS <- tGet1 sh Resp "15" _ OK <- signSendRecv sh rKey3_1 ("15", rId3_1, ACK mId3_1) Resp "16" _ OK <- signSendRecv sh rKey3_2 ("16", rId3_2, ACK mId3_2) pure ()