diff --git a/src/Control/Monad/Trace.hs b/src/Control/Monad/Trace.hs index 8b41257..57a7dec 100644 --- a/src/Control/Monad/Trace.hs +++ b/src/Control/Monad/Trace.hs @@ -43,7 +43,7 @@ import Data.Time.Clock (NominalDiffTime) import Data.Time.Clock.POSIX (POSIXTime, getPOSIXTime) import UnliftIO (MonadUnliftIO, withRunInIO) import UnliftIO.Exception (finally) -import UnliftIO.STM (TChan, TVar, atomically, modifyTVar', newTChanIO, newTVarIO, readTVar, writeTChan, writeTVar) +import UnliftIO.STM (TQueue, TVar, atomically, modifyTVar', newTQueueIO, newTVarIO, readTVar, writeTQueue, writeTVar) -- | A collection of span tags. type Tags = Map Key JSON.Value @@ -76,13 +76,13 @@ data Sample = Sample -- These samples can then be consumed independently, decoupling downstream span processing from -- their production. data Tracer = Tracer - { tracerChannel :: TChan Sample - , tracerPendingCount :: TVar Int + { tracerChannel :: !(TQueue Sample) + , tracerPendingCount :: !(TVar Int) } -- | Creates a new 'Tracer'. newTracer :: MonadIO m => m Tracer -newTracer = liftIO $ Tracer <$> newTChanIO <*> newTVarIO 0 +newTracer = liftIO $ Tracer <$> newTQueueIO <*> newTVarIO 0 -- | Returns the number of spans currently in flight (started but not yet completed). pendingSpanCount :: Tracer -> TVar Int @@ -90,7 +90,7 @@ pendingSpanCount = tracerPendingCount -- | Returns all newly completed spans' samples. The samples become available in the same order they -- are completed. -spanSamples :: Tracer -> TChan Sample +spanSamples :: Tracer -> TQueue Sample spanSamples = tracerChannel data Scope = Scope @@ -145,7 +145,7 @@ instance MonadUnliftIO m => MonadTrace (TraceT m) where modifyTVar' (tracerPendingCount tracer) (\n -> n - 1) tags <- readTVar tagsTV logs <- sortOn (\(t, k, _) -> (t, k)) <$> readTVar logsTV - writeTChan (tracerChannel tracer) (Sample spn tags logs start (end - start)) + writeTQueue (tracerChannel tracer) (Sample spn tags logs start (end - start)) run `finally` cleanup else local (const $ Scope tracer (Just spn) Nothing Nothing) reader diff --git a/src/Monitor/Tracing/Local.hs b/src/Monitor/Tracing/Local.hs index e980ae5..b188311 100644 --- a/src/Monitor/Tracing/Local.hs +++ b/src/Monitor/Tracing/Local.hs @@ -4,10 +4,12 @@ module Monitor.Tracing.Local ( collectSpanSamples ) where -import Control.Concurrent.STM (atomically, readTVar, readTChan, tryReadTChan) +import Control.Concurrent.STM (atomically, readTVar, flushTQueue, readTQueue) +import Control.Monad (when) import Control.Monad.Fix (fix) import Control.Monad.IO.Class (liftIO) import Control.Monad.Trace +import Data.Foldable (for_) import Data.IORef (modifyIORef', newIORef, readIORef) import UnliftIO (MonadUnliftIO) @@ -26,16 +28,15 @@ collectSpanSamples :: MonadUnliftIO m => TraceT m a -> m (a, [Sample]) collectSpanSamples actn = do tracer <- newTracer rv <- runTraceT actn tracer - ref <- liftIO $ newIORef [] - let - addSample spl = liftIO $ modifyIORef' ref (spl:) - samplesTC = spanSamples tracer - pendingTV = pendingSpanCount tracer - liftIO $ fix $ \loop -> do - (mbSample, pending) <- atomically $ (,) <$> tryReadTChan samplesTC <*> readTVar pendingTV - case mbSample of - Just spl -> addSample spl >> loop - Nothing | pending > 0 -> liftIO (atomically $ readTChan samplesTC) >>= addSample >> loop - _ -> pure () - spls <- reverse <$> liftIO (readIORef ref) - pure (rv, spls) + liftIO $ do + ref <- newIORef [] + let + addSample spl = modifyIORef' ref (spl:) + samplesTQ = spanSamples tracer + pendingTV = pendingSpanCount tracer + fix $ \loop -> do + (samples, pending) <- atomically $ (,) <$> flushTQueue samplesTQ <*> readTVar pendingTV + for_ samples addSample + when (pending > 0) $ (atomically $ readTQueue samplesTQ) >>= addSample >> loop + spls <- reverse <$> readIORef ref + pure (rv, spls) diff --git a/src/Monitor/Tracing/Zipkin.hs b/src/Monitor/Tracing/Zipkin.hs index 9844d82..2e5cdff 100644 --- a/src/Monitor/Tracing/Zipkin.hs +++ b/src/Monitor/Tracing/Zipkin.hs @@ -1,4 +1,5 @@ {-# LANGUAGE CPP #-} +{-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE OverloadedStrings #-} @@ -18,9 +19,16 @@ module Monitor.Tracing.Zipkin ( -- ** Endpoint Endpoint(..), defaultEndpoint, - -- * Publishing traces + -- * Tracing actions and publishing traces + -- ** Automatically Zipkin, - new, run, publish, with, + new, with, + PublishFailed(..), + + -- ** Manually + -- | When more flexibility is needed, it's possible to control when traces are published by using + -- the functions below. + run, publish, publishSpans, -- * Cross-process spans -- ** Communication @@ -41,19 +49,17 @@ module Monitor.Tracing.Zipkin ( import Control.Monad.Trace import Control.Monad.Trace.Class -import Control.Concurrent (forkIO, threadDelay) -import Control.Concurrent.STM (atomically, tryReadTChan) -import Control.Monad (forever, guard, void, when) -import Control.Monad.Fix (fix) +import Control.Concurrent.STM (flushTQueue) +import Control.Exception (Exception, SomeException, throw) +import Control.Monad (forever, guard, mfilter, void, when) import Control.Monad.IO.Class (MonadIO, liftIO) import qualified Data.Aeson as JSON import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as BS import Data.CaseInsensitive (CI) import Data.Time.Clock (NominalDiffTime) -import Data.Foldable (toList) +import Data.Foldable (for_, toList) import Data.Int (Int64) -import Data.IORef (modifyIORef, newIORef, readIORef) import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map import Data.Maybe (catMaybes, fromMaybe, listToMaybe, maybeToList) @@ -72,7 +78,9 @@ import Network.HTTP.Client (Manager, Request) import qualified Network.HTTP.Client as HTTP import Network.Socket (HostName, PortNumber) import UnliftIO (MonadUnliftIO) -import UnliftIO.Exception (finally) +import UnliftIO.Concurrent (forkIO, threadDelay) +import UnliftIO.Exception (catch, finally) +import UnliftIO.STM (atomically) -- | 'Zipkin' creation settings. data Settings = Settings @@ -111,17 +119,6 @@ data Zipkin = Zipkin , zipkinEndpoint :: !(Maybe Endpoint) } -flushSpans :: Maybe Endpoint -> Tracer -> Request -> Manager -> IO () -flushSpans ept tracer req mgr = do - ref <- newIORef [] - fix $ \loop -> atomically (tryReadTChan $ spanSamples tracer) >>= \case - Nothing -> pure () - Just sample -> modifyIORef ref (ZipkinSpan ept sample:) >> loop - spns <- readIORef ref - when (not $ null spns) $ do - let req' = req { HTTP.requestBody = HTTP.RequestBodyLBS $ JSON.encode spns } - void $ HTTP.httpLbs req' mgr - -- | Creates a 'Zipkin' publisher for the input 'Settings'. new :: MonadIO m => Settings -> m Zipkin new (Settings mbHostname mbPort mbEpt mbMgr mbPrd) = liftIO $ do @@ -135,12 +132,11 @@ new (Settings mbHostname mbPort mbEpt mbMgr mbPrd) = liftIO $ do , HTTP.port = maybe 9411 fromIntegral mbPort , HTTP.requestHeaders = [("content-type", "application/json")] } - void $ let prd = fromMaybe 0 mbPrd in if prd <= 0 - then pure Nothing - else fmap Just $ forkIO $ forever $ do - threadDelay (microSeconds prd) - flushSpans mbEpt tracer req mgr -- Manager is thread-safe. - pure $ Zipkin mgr req tracer mbEpt + zpk = Zipkin mgr req tracer mbEpt + for_ (microSeconds <$> mfilter (> 0) mbPrd) $ \delay -> forkIO $ forever $ do + threadDelay delay + publish zpk + pure zpk -- | Runs a 'TraceT' action, sampling spans appropriately. Note that this method does not publish -- spans on its own; to do so, either call 'publish' manually or specify a positive @@ -148,10 +144,30 @@ new (Settings mbHostname mbPort mbEpt mbMgr mbPrd) = liftIO $ do run :: TraceT m a -> Zipkin -> m a run actn zipkin = runTraceT actn (zipkinTracer zipkin) --- | Flushes all complete spans to the Zipkin server. -publish :: MonadIO m => Zipkin -> m () -publish z = - liftIO $ flushSpans (zipkinEndpoint z) (zipkinTracer z) (zipkinRequest z) (zipkinManager z) +-- | Error thrown when span publication failed, for example if the server is unreachable. The error +-- exposes the underlying cause as well as the spans which could not be flushed. +data PublishFailed = forall e. Exception e => PublishFailed [ZipkinSpan] e + +instance Show PublishFailed where + show (PublishFailed _ e) = "PublishFailed: " <> show e + +instance Exception PublishFailed + +-- | Flushes the given spans to the Zipkin server. This function is a no-op when input is empty. If +-- publication failed, this function will throw a 'PublishFailed' exception. +publishSpans :: MonadUnliftIO m => Zipkin -> [ZipkinSpan] -> m () +publishSpans zpk spns = when (not $ null spns) $ + let + req = (zipkinRequest zpk) { HTTP.requestBody = HTTP.RequestBodyLBS $ JSON.encode spns } + send = void $ liftIO $ HTTP.httpLbs req (zipkinManager zpk) + in catch send $ \e -> throw (PublishFailed spns (e :: SomeException)) + +-- | Flushes all complete spans to the Zipkin server. See 'publishSpans' for error handling. +publish :: MonadUnliftIO m => Zipkin -> m () +publish zpk = liftIO $ do + let mbEpt = zipkinEndpoint zpk + samples <- atomically $ flushTQueue $ spanSamples $ zipkinTracer zpk + publishSpans zpk $ fmap (ZipkinSpan mbEpt) samples -- | Convenience method to start a 'Zipkin', run an action, and publish all spans before returning. with :: MonadUnliftIO m => Settings -> (Zipkin -> m a) -> m a @@ -421,8 +437,8 @@ instance JSON.ToJSON ZipkinAnnotation where [ "timestamp" JSON..= microSeconds @Int64 t , "value" JSON..= v ] --- Internal type used to encode spans in the --- expected by Zipkin. +-- Type used to encode spans in the expected by +-- Zipkin. data ZipkinSpan = ZipkinSpan !(Maybe Endpoint) !Sample publicTags :: Tags -> Map Text JSON.Value diff --git a/tracing.cabal b/tracing.cabal index a71239e..ab5efd7 100644 --- a/tracing.cabal +++ b/tracing.cabal @@ -1,7 +1,7 @@ cabal-version: 2.0 name: tracing -version: 0.0.6.2 +version: 0.0.7.0 synopsis: Distributed tracing description: An OpenTracing-compliant, simple, and extensible distributed tracing library.