diff --git a/bench-test-lib/bench-test-lib.cabal b/bench-test-lib/bench-test-lib.cabal index b499cc49d6..2fb4cf153a 100644 --- a/bench-test-lib/bench-test-lib.cabal +++ b/bench-test-lib/bench-test-lib.cabal @@ -63,5 +63,6 @@ library , streamly-core , streamly , directory + , filepath hs-source-dirs: src default-language: Haskell2010 diff --git a/bench-test-lib/src/BenchTestLib/DirIO.hs b/bench-test-lib/src/BenchTestLib/DirIO.hs index dd9e588903..d0008a966a 100644 --- a/bench-test-lib/src/BenchTestLib/DirIO.hs +++ b/bench-test-lib/src/BenchTestLib/DirIO.hs @@ -29,37 +29,51 @@ module BenchTestLib.DirIO , listDirChunkPar , listDirChunkParInterleaved , listDirChunkParOrdered -#if !defined(mingw32_HOST_OS) && !defined(__MINGW32__) + , listDirChunkFoldDfs + , listDirChunkFoldBfs + , listDirChunkFoldBfsRev + , listDirChunkFoldAppend + , listDirChunkFoldInterleave + , listDirChunkFoldPar + , listDirChunkFoldParInterleaved + , listDirChunkFoldParOrdered , listDirByteChunked -#endif + , listDirByteChunkedFold ) where -------------------------------------------------------------------------------- -- Imports -------------------------------------------------------------------------------- +import Data.Foldable (for_) import Data.IORef (newIORef, modifyIORef', readIORef) import Data.Maybe (fromJust) import Data.Word (Word8) -#if !defined(mingw32_HOST_OS) && !defined(__MINGW32__) -import Streamly.Data.Array (Array) +#if defined(mingw32_HOST_OS) || defined(__MINGW32__) +import Data.Word (Word16) #endif +import Streamly.Data.Array (Array) import Streamly.Data.Stream (Stream) import Streamly.Data.Unfold (Unfold) import Streamly.FileSystem.Path (Path) import Streamly.Unicode.String (str) import System.Directory (createDirectoryIfMissing) -import Data.Foldable (for_) +import System.FilePath (()) import qualified Streamly.Data.Stream.Prelude as Stream import qualified Streamly.Data.Array as Array +import qualified Streamly.Data.Fold as Fold import qualified Streamly.Internal.Data.Stream as Stream import qualified Streamly.Data.StreamK as StreamK import qualified Streamly.Internal.Data.StreamK as StreamK import qualified Streamly.Data.Unfold as Unfold import qualified Streamly.Internal.Data.Unfold as Unfold import qualified Streamly.Internal.FileSystem.DirIO as Dir -import qualified Streamly.FileSystem.Path as Path +import qualified Streamly.Internal.FileSystem.Path as Path +#if defined(mingw32_HOST_OS) || defined(__MINGW32__) +import qualified Streamly.Internal.Data.Array as Array (unsafeCast) +import qualified Streamly.Unicode.Stream as Unicode +#endif -------------------------------------------------------------------------------- -- Helpers @@ -102,7 +116,6 @@ streamDirMaybe -> Either Path b -> Maybe (Stream IO (Either Path Path)) streamDirMaybe f = either (Just . Dir.readEitherPaths f) (const Nothing) -#if !defined(mingw32_HOST_OS) && !defined(__MINGW32__) _streamDirByteChunked :: (Dir.ReadOptions -> Dir.ReadOptions) -> Either [Path] b -> Stream IO (Either [Path] (Array Word8)) @@ -113,7 +126,16 @@ streamDirByteChunkedMaybe -> Either [Path] b -> Maybe (Stream IO (Either [Path] (Array Word8))) streamDirByteChunkedMaybe f = either (Just . Dir.readEitherByteChunks f) (const Nothing) -#endif + +-- | Like 'streamDirByteChunkedMaybe' but uses 'Dir.readEitherFold' with +-- 'Path.packPathsEndBy' to produce newline-terminated byte chunks. +streamDirByteChunkedFoldMaybe + :: (Dir.ReadOptions -> Dir.ReadOptions) + -> Either [Path] b -> Maybe (Stream IO (Either [Path] (Array Word8))) +streamDirByteChunkedFoldMaybe f = + either + (Just . flip (Dir.readEitherFold f) (Path.packPathsEndBy 32000 10)) + (const Nothing) streamDirChunkedMaybe :: (Dir.ReadOptions -> Dir.ReadOptions) @@ -125,6 +147,21 @@ streamDirChunked -> Either [Path] b -> Stream IO (Either [Path] [Path]) streamDirChunked f = either (Dir.readEitherChunks f) (const Stream.nil) +-- | Same shape as 'streamDirChunkedMaybe' but uses 'Dir.readEitherFold' with +-- 'Fold.toList', which should be equivalent in coverage to +-- 'Dir.readEitherChunks' for traversal tests. +streamDirChunkedFoldMaybe + :: (Dir.ReadOptions -> Dir.ReadOptions) + -> Either [Path] b -> Maybe (Stream IO (Either [Path] [Path])) +streamDirChunkedFoldMaybe f = + either (Just . flip (Dir.readEitherFold f) Fold.toList) (const Nothing) + +streamDirChunkedFold + :: (Dir.ReadOptions -> Dir.ReadOptions) + -> Either [Path] b -> Stream IO (Either [Path] [Path]) +streamDirChunkedFold f = + either (flip (Dir.readEitherFold f) Fold.toList) (const Stream.nil) + -------------------------------------------------------------------------------- -- Functions -------------------------------------------------------------------------------- @@ -139,20 +176,47 @@ createDirStucture root d w = do createDirStucture_ ref parentDir depth width = do for_ [1..width] $ \i -> do let iStr = show i - subDir = [str|#{parentDir}/dir_#{iStr}|] + subDir = [str|#{parentDir}|] [str|dir_#{iStr}|] createDirectoryIfMissing True subDir modifyIORef' ref (subDir:) createDirStucture_ ref subDir (depth - 1) width -#if !defined(mingw32_HOST_OS) && !defined(__MINGW32__) --- Fastest implementation, only works for posix as of now. +-- Fastest implementation. On Windows the underlying readEitherByteChunks +-- emits UTF-16LE bytes (each char as 2 bytes, separator '\\' and newline '\n' +-- also encoded as 2 bytes). Chunks are split at entry boundaries, so each +-- chunk can be transcoded independently to UTF-8 without splitting surrogate +-- pairs. On Posix the bytes are already UTF-8. +{-# INLINE toUtf8Chunks #-} +toUtf8Chunks :: Stream IO (Array Word8) -> Stream IO (Array Word8) +#if defined(mingw32_HOST_OS) || defined(__MINGW32__) +toUtf8Chunks = Stream.mapM utf16leChunkToUtf8 + +utf16leChunkToUtf8 :: Array Word8 -> IO (Array Word8) +utf16leChunkToUtf8 arr = + Stream.fold Array.create + $ Unicode.encodeUtf8 + $ Unicode.decodeUtf16le + $ Array.read (Array.unsafeCast arr :: Array Word16) +#else +toUtf8Chunks = id +#endif + {-# INLINE listDirByteChunked #-} listDirByteChunked :: FilePath -> Stream IO (Array Word8) -listDirByteChunked inp = do - Stream.catRights +listDirByteChunked inp = + toUtf8Chunks + $ Stream.catRights $ Stream.concatIterate (streamDirByteChunkedMaybe id) $ Stream.fromPure (Left [fromJust $ Path.fromString inp]) -#endif + +-- | Like 'listDirByteChunked' but uses 'Dir.readEitherFold' with +-- 'Path.packPathsEndBy' to compact paths into newline-terminated byte chunks. +{-# INLINE listDirByteChunkedFold #-} +listDirByteChunkedFold :: FilePath -> Stream IO (Array Word8) +listDirByteChunkedFold inp = do + Stream.catRights + $ Stream.concatIterate (streamDirByteChunkedFoldMaybe id) + $ Stream.fromPure (Left [fromJust $ Path.fromString inp]) -- Faster than the listDir implementation below {-# INLINE listDirChunkedWith #-} @@ -161,19 +225,36 @@ listDirChunkedWith -> [Char] -> Stream IO Word8 listDirChunkedWith act inp = do Stream.unfoldEachEndBy 10 Array.reader - $ fmap (Array.asBytes . Path.toArray) + $ fmap Path.toUtf8Array $ Stream.unfoldEach Unfold.fromList $ fmap (either id id) $ act $ Stream.fromPure (Left [fromJust $ Path.fromString inp]) +-- | Variant of 'listDirChunkedWith' for the 'readEitherFold' variants. The +-- fold receives both directory and file entries, so its 'Right' output already +-- contains every path; the 'Left' chunks are only used to drive further +-- traversal and must be discarded here to avoid emitting directory paths +-- twice. +{-# INLINE listDirChunkedFoldWith #-} +listDirChunkedFoldWith + :: (Stream IO (Either [Path] b) -> Stream IO (Either [Path] [Path])) + -> [Char] -> Stream IO Word8 +listDirChunkedFoldWith act inp = do + Stream.unfoldEachEndBy 10 Array.reader + $ fmap Path.toUtf8Array + $ Stream.unfoldEach Unfold.fromList + $ Stream.catRights + $ act + $ Stream.fromPure (Left [fromJust $ Path.fromString inp]) + {-# INLINE listDirWith #-} listDirWith :: (Stream IO (Either Path Path) -> Stream IO (Either Path Path)) -> [Char] -> Stream IO Word8 listDirWith act inp = do Stream.unfoldEachEndBy 10 Array.reader - $ fmap (Array.asBytes . Path.toArray . either id id) + $ fmap (Path.toUtf8Array . either id id) $ act $ Stream.fromPure (Left (fromJust $ Path.fromString inp)) @@ -295,3 +376,69 @@ listDirChunkParOrdered listDirChunkParOrdered f = listDirChunkedWith (Stream.parConcatIterate (Stream.ordered True) (streamDirChunked f)) + +-------------------------------------------------------------------------------- +-- Chunked via readEitherFold + Fold.toList +-------------------------------------------------------------------------------- + +-- These mirror the listDirChunk* variants above but exercise 'readEitherFold' +-- with 'Fold.toList'. Since the fold receives all entries (dirs and files), +-- they use 'listDirChunkedFoldWith' which keeps only the 'Right' output and +-- discards the 'Left' traversal-driver chunks. + +{-# INLINE listDirChunkFoldDfs #-} +listDirChunkFoldDfs + :: (Dir.ReadOptions -> Dir.ReadOptions) -> [Char] -> Stream IO Word8 +listDirChunkFoldDfs f = + listDirChunkedFoldWith (Stream.concatIterate (streamDirChunkedFoldMaybe f)) + +{-# INLINE listDirChunkFoldBfs #-} +listDirChunkFoldBfs + :: (Dir.ReadOptions -> Dir.ReadOptions) -> [Char] -> Stream IO Word8 +listDirChunkFoldBfs f = + listDirChunkedFoldWith + (Stream.bfsConcatIterate (streamDirChunkedFoldMaybe f)) + +{-# INLINE listDirChunkFoldBfsRev #-} +listDirChunkFoldBfsRev + :: (Dir.ReadOptions -> Dir.ReadOptions) -> [Char] -> Stream IO Word8 +listDirChunkFoldBfsRev f = + listDirChunkedFoldWith + (Stream.altBfsConcatIterate (streamDirChunkedFoldMaybe f)) + +{-# INLINE listDirChunkFoldAppend #-} +listDirChunkFoldAppend + :: (Dir.ReadOptions -> Dir.ReadOptions) -> [Char] -> Stream IO Word8 +listDirChunkFoldAppend f = + listDirChunkedFoldWith + (concatIterateWith (streamDirChunkedFold f) StreamK.append) + +{-# INLINE listDirChunkFoldInterleave #-} +listDirChunkFoldInterleave + :: (Dir.ReadOptions -> Dir.ReadOptions) -> [Char] -> Stream IO Word8 +listDirChunkFoldInterleave f = + listDirChunkedFoldWith + (mergeIterateWith (streamDirChunkedFold f) StreamK.interleave) + +{-# INLINE listDirChunkFoldPar #-} +listDirChunkFoldPar + :: (Dir.ReadOptions -> Dir.ReadOptions) -> [Char] -> Stream IO Word8 +listDirChunkFoldPar f = + listDirChunkedFoldWith + (Stream.parConcatIterate id (streamDirChunkedFold f)) + +{-# INLINE listDirChunkFoldParInterleaved #-} +listDirChunkFoldParInterleaved + :: (Dir.ReadOptions -> Dir.ReadOptions) -> [Char] -> Stream IO Word8 +listDirChunkFoldParInterleaved f = + listDirChunkedFoldWith + (Stream.parConcatIterate + (Stream.interleaved True) (streamDirChunkedFold f)) + +{-# INLINE listDirChunkFoldParOrdered #-} +listDirChunkFoldParOrdered + :: (Dir.ReadOptions -> Dir.ReadOptions) -> [Char] -> Stream IO Word8 +listDirChunkFoldParOrdered f = + listDirChunkedFoldWith + (Stream.parConcatIterate + (Stream.ordered True) (streamDirChunkedFold f)) diff --git a/core/src/Streamly/FileSystem/Path.hs b/core/src/Streamly/FileSystem/Path.hs index bf1c4bfffd..757c4e2fde 100644 --- a/core/src/Streamly/FileSystem/Path.hs +++ b/core/src/Streamly/FileSystem/Path.hs @@ -107,6 +107,7 @@ module Streamly.FileSystem.Path , unsafeJoin , join , joinStr + , packPathsEndBy -- * Splitting root , splitRoot diff --git a/core/src/Streamly/Internal/FileSystem/DirIO.hs b/core/src/Streamly/Internal/FileSystem/DirIO.hs index a80dbb7551..02c519e9b1 100644 --- a/core/src/Streamly/Internal/FileSystem/DirIO.hs +++ b/core/src/Streamly/Internal/FileSystem/DirIO.hs @@ -8,103 +8,7 @@ -- Maintainer : streamly@composewell.com -- Portability : GHC -- --- Directory traversal API design notes: --- --- Paths returned by readdir: --- -------------------------- --- The paths returned by "read" can be absolute (/x/y/z), relative to current --- directory e.g. if current dir is /x then path is (./y/z), or path segments --- relative to current dir e.g. (y/z). To accomodate all the cases we can --- provide a prefix to readdir to attach to the paths being generated e.g. the --- prefix would be "/x" in the previous example. Alternatively, we could take --- the approach of the higher layer doing that, but it is more efficient to --- allocate the path buffer once rather than modifying it later. We can do this --- by passing a fold to transform the output. --- --- Filtering of paths: --- ------------------- --- --- It may be more efficient to apply a filter to the paths during readdir API --- itself instead of applying it in a layer above. That way we cut the output --- at the source rather than generate and then discard it later. We can do this --- by passing a fold to filter the input. --- --- Symlink Resolution: --- ------------------- --- --- When reading a symlink directory we can resolve the symlink and read the --- destination directory or we can just emit the file it is pointing to and --- the read can happen next at the higher level, in the traversal logic --- (concatIterate). Not sure if one approach has any significant perf impact --- over the other. Similar thinking applies to a mount point as well. Also, if --- we resolve the symlinks in concatIterate, then each resolution will be --- counted as depth level increment whereas if we resolve that at lower level --- then it won't. We can do this by passing an option to modify the behavior. --- --- Cyclic paths: --- ------------- --- --- When resolving cyclic directory symlinks one way to curtail it is ELOOP --- which gives up if it encounters too many level. Another way is to use --- the inode information to check if we are traversing an already traversed --- inode, this is in general helpful in a graph traversal. We can ignore --- ELOOP by passing an option but it may be inefficient because we may --- encounter the loop from any node in the cycle. --- --- Broken links and permission issues: --- ----------------------------------- --- --- If we encounter an error reading a directory because of permission --- issues should we ignore it in this low level API or catch it in the --- higher level traversal functionality? Similarly, if there are broken --- symlinks, where to handle the error? Need to check performance when --- handling it in ListDir. Suppressing the error at the lower level may be --- more efficient than propagating it up and then handling it there. We can --- do this by passing an option. --- --- Returning the metadata: --- ----------------------- --- --- Specific scans can be used to return the metadata in the output stream if --- needed. However, we may need three different APIs: without metadata, with --- fast metadata, and with full metadata. In each case the fold input would --- be different. --- --- * readMinimal: read only the path names, no metadata --- * readStandard: read the path and minimal metadata --- * readFull: read full metadata --- --- NOTE: Full metadata can be read by mapping a stat call to a stream of paths --- rather than via readdir API. Does it help the performance to do it in the --- readdir API? --- --- Passing a scan to the readdir operation: --- ---------------------------------------- --- --- There are two approaches to traversal and filtering. (1) Read the attributes --- as data and provide it to a high level traversal handler to do filtering etc. --- (2) pass a fold or scan to the reader itself which does the filtering. --- --- By passing a scan we can process the output right at the source and produce --- a cooked output. Otherwise we may have to produce a stream of intermediate --- structures which may have more per item overhead and that overhead may not --- get eliminated by fusion. For example, a fold can directly write the CString --- from readdir to the output buffer whereas if we output the Path then we will --- incur an overhead of intermediate structure. --- --- Filesystem functionality modules: --- --------------------------------- --- --- This DirIO module mainly provides "readdir" functionality. File stat --- functionality is coupled to readdir because we may return file stats along --- with filepaths, or may provide functionality to filter based on stats. --- --- The FileTest module in streamly-coreutils provides the file stat read --- operations, we may have to bring that here if some coupling with readdir is --- needed. --- --- FileIO module provides regular file create operation. --- +-- See docs/Developer/FileSystem.DirIO.md for design notes. module Streamly.Internal.FileSystem.DirIO ( @@ -128,6 +32,7 @@ module Streamly.Internal.FileSystem.DirIO , readEither , readEitherPaths , OS.readEitherChunks + , OS.readEitherFold , OS.readEitherByteChunks , readEitherChunksPortable diff --git a/core/src/Streamly/Internal/FileSystem/DirOptions.hs b/core/src/Streamly/Internal/FileSystem/DirOptions.hs index 7aeb1b227f..927d6d8695 100644 --- a/core/src/Streamly/Internal/FileSystem/DirOptions.hs +++ b/core/src/Streamly/Internal/FileSystem/DirOptions.hs @@ -5,6 +5,8 @@ -- License : BSD3 -- Maintainer : streamly@composewell.com -- Portability : GHC +-- +-- See docs/Developer/FileSystem.DirIO.md for design notes. module Streamly.Internal.FileSystem.DirOptions ( @@ -17,21 +19,6 @@ module Streamly.Internal.FileSystem.DirOptions ) where --- NOTE: If we are following symlinks, then we want to determine the type of --- the link destination not the link itself, so we need to use stat instead of --- lstat for resolving the symlink. --- --- For recursive traversal, instead of classifying the dirents using stat, we --- can leave them unclassified, and deal with ENOTDIR when doing an opendir. We --- can just ignore that error if it is not a dir. This way we do not need to do --- stat at all. Or we can basically say don't try to determine the type of --- symlinks and always try to read symlinks as dirs. We can have an option for --- classifying symlinks or DT_UNKNOWN as potential dirs. - --- When resolving a symlink we may encounter errors only if the directory entry --- is a symlink. If the directory entry is not a symlink then stat on it will --- have permissions, it will not give ELOOP or ENOENT unless the file was --- deleted or recreated after we read the dirent. -- | Options controlling the behavior of directory read. data ReadOptions = diff --git a/core/src/Streamly/Internal/FileSystem/PosixPath.hs b/core/src/Streamly/Internal/FileSystem/PosixPath.hs index 4682e7aedd..3dfd5c5ad5 100644 --- a/core/src/Streamly/Internal/FileSystem/PosixPath.hs +++ b/core/src/Streamly/Internal/FileSystem/PosixPath.hs @@ -159,6 +159,7 @@ module Streamly.Internal.FileSystem.OS_PATH_TYPE , join , joinDir , unsafeJoinPaths + , packPathsEndBy -- * Splitting -- | Note: you can use 'unsafeJoin' as a replacement for the joinDrive @@ -218,7 +219,11 @@ where import Control.Exception (throw) import Control.Monad.Catch (MonadThrow(..)) +import Control.Monad.IO.Class (MonadIO(..)) import Data.Bifunctor (bimap) +#ifdef IS_WINDOWS +import Data.Bits ((.&.), (.|.), shiftL, shiftR) +#endif import Data.Functor.Identity (Identity(..)) import Data.Maybe (fromJust, isJust) import Data.Word (Word8) @@ -229,13 +234,17 @@ import Foreign (castPtr) import Data.Word (Word16) import Foreign (Ptr) #endif +import Fusion.Plugin.Types (Fuse(..)) import Language.Haskell.TH.Syntax (lift) import Streamly.Internal.Data.Array (Array(..)) +import Streamly.Internal.Data.Fold (Fold(..)) +import Streamly.Internal.Data.MutByteArray (MutByteArray) import Streamly.Internal.Data.Stream (Stream) import Streamly.Internal.FileSystem.Path.Common (mkQ, EqCfg(..)) import qualified Streamly.Internal.Data.Array as Array import qualified Streamly.Internal.Data.Fold as Fold +import qualified Streamly.Internal.Data.MutByteArray as MutByteArray import qualified Streamly.Internal.Data.Stream as Stream import qualified Streamly.Internal.FileSystem.Path.Common as Common import qualified Streamly.Internal.Unicode.Stream as Unicode @@ -1723,3 +1732,70 @@ stripPrefix cfg (OS_PATH prefix) (OS_PATH p) = $ Common.stripPrefix Unicode.UNICODE_DECODER Common.OS_NAME (cfg eqCfg) prefix p + +------------------------------------------------------------------------------ +-- Packing paths into a byte array +------------------------------------------------------------------------------ + +#ifndef IS_WINDOWS + +{-# ANN type PackPathsState Fuse #-} +data PackPathsState = + PackPathsState !MutByteArray !Int !Int -- buf, pos, cap + +-- XXX We can allocate the array on the first input. + +-- | Copies a stream of file paths into a contiguous byte array, appending +-- the given separator byte after each path. +-- +-- Commonly used separators: +-- +-- * @0@ : NUL-terminated paths +-- * @10@ : newline-separated paths +-- +-- The first argument specifies the initial output buffer size in bytes, if the +-- size is close to the block size it may be rounded to the block size. The +-- buffer is grown further only if even one path can not fit into it. The fold +-- terminates when no more space is left in the buffer to accomodate more +-- paths. +-- +{-# INLINE packPathsEndBy #-} +packPathsEndBy :: + MonadIO m => Int -> Word8 -> Fold m OS_PATH_TYPE (Array Word8) +packPathsEndBy bufBytes sep = Fold step initial extract extract + + where + + initialCap = MutByteArray.roundUpLargeArray (max 1 bufBytes) + + initial + | bufBytes < 0 = + error + $ "packPathsEndBy: size [" ++ show bufBytes + ++ "] must be a natural number" + | otherwise = do + mbarr <- liftIO $ MutByteArray.new' initialCap + return $ Fold.Partial (PackPathsState mbarr 0 initialCap) + + step (PackPathsState buf pos cap) (OS_PATH (Array src start end)) = + liftIO $ do + let srcLen = end - start + needed = pos + srcLen + 1 -- +1 for the separator byte + (buf1, cap1) <- + if cap >= needed + then return (buf, cap) + else do + b <- MutByteArray.reallocSliceAs + MutByteArray.Pinned needed buf 0 pos + return (b, needed) + MutByteArray.unsafePutSlice src start buf1 pos srcLen + MutByteArray.pokeAt (pos + srcLen) buf1 sep + let pos1 = pos + srcLen + 1 + return + $ if cap1 - pos1 < max 128 (2 * srcLen) + then Fold.Done (Array buf1 0 pos1) + else Fold.Partial (PackPathsState buf1 pos1 cap1) + + extract (PackPathsState buf pos _) = return (Array buf 0 pos) + +#endif diff --git a/core/src/Streamly/Internal/FileSystem/WindowsPath.hs b/core/src/Streamly/Internal/FileSystem/WindowsPath.hs index 7ecdae7588..856e954263 100644 --- a/core/src/Streamly/Internal/FileSystem/WindowsPath.hs +++ b/core/src/Streamly/Internal/FileSystem/WindowsPath.hs @@ -404,3 +404,160 @@ splitPath (OS_PATH a) = fmap OS_PATH $ Common.splitPath Common.OS_NAME a splitExtension :: OS_PATH_TYPE -> Maybe (OS_PATH_TYPE, OS_PATH_TYPE) splitExtension (OS_PATH a) = fmap (bimap OS_PATH OS_PATH) $ Common.splitExtension Common.OS_NAME a + +------------------------------------------------------------------------------ +-- Packing paths into a UTF-8 byte array +------------------------------------------------------------------------------ + +{-# ANN type PackPathsState Fuse #-} +data PackPathsState = + PackPathsState !MutByteArray !Int !Int -- buf, pos, cap + +-- XXX We can allocate the array on the first input. + +-- | Encodes a stream of file paths into a contiguous byte array with UTF-8 +-- encoding, appending the given separator byte after each path. +-- +-- The input 'WindowsPath' is UTF-16LE encoded; this fold decodes the +-- Word16s and re-encodes them as UTF-8. Invalid code units (lone or invalid +-- surrogates, or input underflow on a high surrogate at end of path) are +-- replaced with the Unicode replacement character U+FFFD. +-- +-- Commonly used separators: +-- +-- * @0@ : NUL-terminated paths +-- * @10@ : newline-separated paths +-- +-- The first argument specifies the initial output buffer size in bytes, if the +-- size is close to the block size it may be rounded to the block size. The +-- buffer is grown further only if even one path can not fit into it. The fold +-- terminates when no more space is left in the buffer to accomodate more +-- paths. +{-# INLINE packPathsEndBy #-} +packPathsEndBy :: + MonadIO m => Int -> Word8 -> Fold m OS_PATH_TYPE (Array Word8) +packPathsEndBy bufBytes sep = Fold step initial extract extract + + where + + initialCap = MutByteArray.roundUpLargeArray (max 1 bufBytes) + + initial + | bufBytes < 0 = + error + $ "packPathsEndBy: size [" ++ show bufBytes + ++ "] must be a natural number" + | otherwise = do + mbarr <- liftIO $ MutByteArray.new' initialCap + return $ Fold.Partial (PackPathsState mbarr 0 initialCap) + + -- UTF-8 encode a code point at dst[off..]; returns the offset past the + -- bytes written. + {-# INLINE encodeCodePoint #-} + encodeCodePoint dst off cp + | cp < 0x80 = do + MutByteArray.pokeAt off dst (fromIntegral cp :: Word8) + return (off + 1) + | cp < 0x800 = do + MutByteArray.pokeAt off dst + (fromIntegral ((cp `shiftR` 6) + 0xC0) :: Word8) + MutByteArray.pokeAt (off + 1) dst + (fromIntegral ((cp .&. 0x3F) + 0x80) :: Word8) + return (off + 2) + | cp < 0x10000 = do + MutByteArray.pokeAt off dst + (fromIntegral ((cp `shiftR` 12) + 0xE0) :: Word8) + MutByteArray.pokeAt (off + 1) dst + (fromIntegral (((cp `shiftR` 6) .&. 0x3F) + 0x80) :: Word8) + MutByteArray.pokeAt (off + 2) dst + (fromIntegral ((cp .&. 0x3F) + 0x80) :: Word8) + return (off + 3) + | otherwise = do + MutByteArray.pokeAt off dst + (fromIntegral ((cp `shiftR` 18) + 0xF0) :: Word8) + MutByteArray.pokeAt (off + 1) dst + (fromIntegral (((cp `shiftR` 12) .&. 0x3F) + 0x80) :: Word8) + MutByteArray.pokeAt (off + 2) dst + (fromIntegral (((cp `shiftR` 6) .&. 0x3F) + 0x80) :: Word8) + MutByteArray.pokeAt (off + 3) dst + (fromIntegral ((cp .&. 0x3F) + 0x80) :: Word8) + return (off + 4) + + -- U+FFFD as UTF-8: EF BF BD + {-# INLINE writeReplacement #-} + writeReplacement dst off = do + MutByteArray.pokeAt off dst (0xEF :: Word8) + MutByteArray.pokeAt (off + 1) dst (0xBF :: Word8) + MutByteArray.pokeAt (off + 2) dst (0xBD :: Word8) + return (off + 3) + + -- Convert UTF-16LE Word16s in src[srcOff..srcEnd) (byte offsets) to UTF-8 + -- bytes in dst starting at dstOff. Returns the dst offset past the last + -- byte written. The caller must ensure dst has at least 3 * srcWordLen + -- bytes of remaining capacity starting at dstOff (worst-case expansion). + convert src srcOff srcEnd dst = go srcOff + where + go !sOff !dOff + | sOff >= srcEnd = return dOff + | otherwise = do + w :: Word16 <- MutByteArray.peekAt sOff src + if w < 0xD800 || w > 0xDFFF + then do + dOff' <- encodeCodePoint dst dOff (fromIntegral w :: Int) + go (sOff + 2) dOff' + else if w <= 0xDBFF + -- high surrogate + then do + let sOff' = sOff + 2 + if sOff' >= srcEnd + then do + -- input underflow at end of path + writeReplacement dst dOff + else do + wLow :: Word16 + <- MutByteArray.peekAt sOff' src + if wLow >= 0xDC00 && wLow <= 0xDFFF + then do + let hi = fromIntegral + (w - 0xD800) :: Int + lo = fromIntegral + (wLow - 0xDC00) :: Int + cp = 0x10000 + + ((hi `shiftL` 10) .|. lo) + dOff' + <- encodeCodePoint dst dOff cp + go (sOff + 4) dOff' + else do + -- invalid low surrogate + dOff' + <- writeReplacement dst dOff + go (sOff + 2) dOff' + else do + -- lone low surrogate + dOff' <- writeReplacement dst dOff + go (sOff + 2) dOff' + + step (PackPathsState buf pos cap) (OS_PATH (Array src start end)) = + liftIO $ do + let srcByteLen = end - start + srcWordLen = srcByteLen `shiftR` 1 + -- Worst case UTF-8 expansion: each Word16 -> 3 bytes, plus + -- one separator byte. + needed = pos + 3 * srcWordLen + 1 + (buf1, cap1) <- + if cap >= needed + then return (buf, cap) + else do + b <- MutByteArray.reallocSliceAs + MutByteArray.Pinned needed buf 0 pos + return (b, needed) + pos1 <- convert src start end buf1 pos + MutByteArray.pokeAt pos1 buf1 sep + let pos2 = pos1 + 1 + utf8Len = pos2 - pos + return + $ if cap1 - pos2 < max 128 (2 * utf8Len) + then Fold.Done (Array buf1 0 pos2) + else Fold.Partial (PackPathsState buf1 pos2 cap1) + + extract (PackPathsState buf pos _) = return (Array buf 0 pos) diff --git a/core/src/Streamly/Internal/Syscall/Posix/ReadDir.hsc b/core/src/Streamly/Internal/Syscall/Posix/ReadDir.hsc index 7c3b7c3658..b293621505 100644 --- a/core/src/Streamly/Internal/Syscall/Posix/ReadDir.hsc +++ b/core/src/Streamly/Internal/Syscall/Posix/ReadDir.hsc @@ -19,6 +19,7 @@ module Streamly.Internal.Syscall.Posix.ReadDir , closeDirStream , readDirStreamEither , readEitherChunks + , readEitherFold , readEitherByteChunks , readEitherByteChunksAt , eitherReader @@ -43,6 +44,7 @@ import Foreign.C import Foreign.Storable (poke) import Fusion.Plugin.Types (Fuse(..)) import Streamly.Internal.Data.Array (Array(..)) +import Streamly.Internal.Data.Fold.Type (Fold(..)) import Streamly.Internal.Data.MutByteArray (MutByteArray) import Streamly.Internal.Data.Scanl (Scanl) import Streamly.Internal.Data.Stream (Stream(..), Step(..)) @@ -55,6 +57,7 @@ import Streamly.Internal.FileSystem.PosixPath (PosixPath(..)) import System.Posix.Types (Fd(..)) import qualified Streamly.Internal.Data.Array as Array +import qualified Streamly.Internal.Data.Fold.Type as Fold import qualified Streamly.Internal.Data.MutByteArray as MutByteArray import qualified Streamly.Internal.Data.Unfold as UF (bracketIO) import qualified Streamly.Internal.FileSystem.PosixPath as Path @@ -479,6 +482,164 @@ readEitherChunks confMod alldirs = then return $ Skip (ChunkStreamInit xs dirs ndirs files nfiles) else liftIO $ throwErrno "readEitherChunks" +{-# ANN type ChunkFoldStreamState Fuse #-} +data ChunkFoldStreamState fs b = + -- | Fold not yet initialized. Fields: input dirs, buffered output + -- dirs, output dir count. + ChunkFoldStart [PosixPath] [PosixPath] Int + -- | Fold initialized; need to open next dir. Fields: input dirs, + -- buffered output dirs, output dir count, fold state, whether the + -- current fold instance has consumed any input. + | ChunkFoldNext [PosixPath] [PosixPath] Int fs Bool + -- | Iterating in a dir. + | ChunkFoldLoop + PosixPath -- current dir path + [PosixPath] -- remaining input dirs + (Ptr CDir) -- current dir stream + [PosixPath] -- buffered output dirs + Int -- output dir count + fs -- fold state + Bool -- whether the fold has consumed any input + -- | Fold returned Done while in a dir loop; re-init and continue. + | ChunkFoldLoopReinit + PosixPath + [PosixPath] + (Ptr CDir) + [PosixPath] + Int + -- | Yield a value, then transition to the wrapped state. + | ChunkFoldYield (Either [PosixPath] b) (ChunkFoldStreamState fs b) + | ChunkFoldStop + +-- XXX the basic readdir can emit (Path, Maybe FileType) tuples, the fold can +-- be implemented on top of that. The stat for followSymlinks can also be done +-- by the recursive traversal handler. That will be more modular. +-- +-- XXX Like readEitherByteChunks we may want to splitHalf the pending work +-- items and return that whenever the buffer is filled, so that other workers +-- can pick it up. + +-- | Like 'readEitherChunks' but collects entries using a 'Fold' instead of +-- buffering into a list. All entries (both directories and files) are fed to +-- the fold. The fold output @b@ is yielded as @Right@ each time the fold +-- terminates; directory paths are additionally yielded as @Left@ in chunks so +-- the caller can recurse into them. When the traversal completes, the fold's +-- @final@ is invoked and its result is yielded only if the fold consumed at +-- least one path (matching 'Streamly.Internal.Data.Stream.foldMany' +-- semantics). +{-# INLINE readEitherFold #-} +readEitherFold + :: MonadIO m + => (ReadOptions -> ReadOptions) + -> [PosixPath] + -> Fold m PosixPath b + -> Stream m (Either [PosixPath] b) +readEitherFold confMod alldirs (Fold fstep finitial _ ffinal) = + Stream step (ChunkFoldStart alldirs [] 0) + + where + + conf = confMod defaultReadOptions + + -- We want to keep the dir batching as low as possible for better + -- concurrency esp when the number of dirs is low. + dirMax = 4 + + step _ (ChunkFoldYield x next) = return $ Yield x next + + step _ ChunkFoldStop = return Stop + + step _ (ChunkFoldStart xs dirs ndirs) = do + r <- finitial + case r of + Fold.Done b -> + return $ Yield (Right b) (ChunkFoldStart xs dirs ndirs) + Fold.Partial fs -> + return $ Skip (ChunkFoldNext xs dirs ndirs fs False) + + step _ (ChunkFoldLoopReinit curdir xs dirp dirs ndirs) = do + r <- finitial + case r of + Fold.Done b -> + return $ Yield (Right b) + (ChunkFoldLoopReinit curdir xs dirp dirs ndirs) + Fold.Partial fs -> + return $ Skip (ChunkFoldLoop curdir xs dirp dirs ndirs fs False) + + step _ (ChunkFoldNext (x:xs) dirs ndirs fs nonEmpty) = do + DirStream dirp <- liftIO $ openDirStream x + return $ Skip (ChunkFoldLoop x xs dirp dirs ndirs fs nonEmpty) + + step _ (ChunkFoldNext [] dirs _ fs nonEmpty) = do + b <- ffinal fs + case (nonEmpty, dirs) of + (False, []) -> return Stop + (False, _) -> return $ Yield (Left dirs) ChunkFoldStop + (True, []) -> return $ Yield (Right b) ChunkFoldStop + (True, _) -> + return $ Yield (Right b) + (ChunkFoldYield (Left dirs) ChunkFoldStop) + + step _ st@(ChunkFoldLoop curdir xs dirp dirs ndirs fs nonEmpty) = do + liftIO resetErrno + dentPtr <- liftIO $ c_readdir dirp + if (dentPtr /= nullPtr) + then do + let dname = #{ptr struct dirent, d_name} dentPtr + dtype :: #{type unsigned char} <- + liftIO $ #{peek struct dirent, d_type} dentPtr + + etype <- liftIO $ getEntryType conf curdir dname dtype + case etype of + EntryIsDir -> do + path <- liftIO $ Path.appendCString curdir dname + let dirs1 = path : dirs + ndirs1 = ndirs + 1 + r <- fstep fs path + case r of + Fold.Done b -> + if ndirs1 >= dirMax + then return $ Yield (Right b) + (ChunkFoldYield (Left dirs1) + (ChunkFoldLoopReinit + curdir xs dirp [] 0)) + else return $ Yield (Right b) + (ChunkFoldLoopReinit + curdir xs dirp dirs1 ndirs1) + Fold.Partial fs1 -> + if ndirs1 >= dirMax + then return $ Yield (Left dirs1) + (ChunkFoldLoop + curdir xs dirp [] 0 fs1 True) + else return $ Skip + (ChunkFoldLoop + curdir xs dirp dirs1 ndirs1 fs1 True) + EntryIsNotDir -> do + path <- liftIO $ Path.appendCString curdir dname + r <- fstep fs path + case r of + Fold.Done b -> + return $ Yield (Right b) + (ChunkFoldLoopReinit + curdir xs dirp dirs ndirs) + Fold.Partial fs1 -> + return $ Skip + (ChunkFoldLoop + curdir xs dirp dirs ndirs fs1 True) + EntryIgnored -> return $ Skip st + else do + errno <- liftIO getErrno + if (errno == eINTR) + then return $ Skip st + else do + let (Errno n) = errno + -- XXX Exception safety: fold state is not cleaned up if we + -- throw here. + liftIO $ closeDirStream (DirStream dirp) + if (n == 0) + then return $ Skip (ChunkFoldNext xs dirs ndirs fs nonEmpty) + else liftIO $ throwErrno "readEitherFold" + foreign import ccall unsafe "string.h memcpy" c_memcpy :: Ptr Word8 -> Ptr Word8 -> CSize -> IO (Ptr Word8) diff --git a/core/src/Streamly/Internal/Syscall/Windows/ReadDir.hsc b/core/src/Streamly/Internal/Syscall/Windows/ReadDir.hsc index 1e7123c921..0517c2025b 100644 --- a/core/src/Streamly/Internal/Syscall/Windows/ReadDir.hsc +++ b/core/src/Streamly/Internal/Syscall/Windows/ReadDir.hsc @@ -16,6 +16,7 @@ module Streamly.Internal.Syscall.Windows.ReadDir , closeDirStream , readDirStreamEither , readEitherChunks + , readEitherFold , readEitherByteChunks , eitherReader , reader @@ -39,6 +40,7 @@ import Foreign.C import Fusion.Plugin.Types (Fuse(..)) import Numeric (showHex) import Streamly.Internal.Data.Array (Array(..)) +import Streamly.Internal.Data.Fold.Type (Fold(..)) import Streamly.Internal.Data.MutByteArray (MutByteArray) import Streamly.Internal.Data.Unfold.Type (Unfold(..)) import Streamly.Internal.Data.Stream (Stream(..), Step(..)) @@ -47,6 +49,7 @@ import Streamly.Internal.FileSystem.WindowsPath (WindowsPath(..)) import System.IO.Error (ioeSetErrorString) import qualified Streamly.Internal.Data.Array as Array +import qualified Streamly.Internal.Data.Fold.Type as Fold import qualified Streamly.Internal.Data.MutArray as MutArray import qualified Streamly.Internal.Data.MutByteArray as MutByteArray import qualified Streamly.Internal.Data.Unfold as UF (bracketIO) @@ -416,6 +419,164 @@ readEitherChunks _confMod alldirs = liftIO $ closeDirStream ds return $ Skip (ChunkStreamInit xs dirs ndirs files nfiles) +{-# ANN type ChunkFoldStreamState Fuse #-} +data ChunkFoldStreamState fs b = + -- | Fold not yet initialized. Fields: input dirs, buffered output + -- dirs, output dir count. + ChunkFoldStart [WindowsPath] [WindowsPath] Int + -- | Fold initialized; need to open next dir. Fields: input dirs, + -- buffered output dirs, output dir count, fold state, whether the + -- current fold instance has consumed any input. + | ChunkFoldNext [WindowsPath] [WindowsPath] Int fs Bool + -- | Iterating in a dir. + | ChunkFoldLoop + WindowsPath -- current dir path + [WindowsPath] -- remaining input dirs + DirStream -- current dir stream + [WindowsPath] -- buffered output dirs + Int -- output dir count + fs -- fold state + Bool -- whether the fold has consumed any input + -- | Fold returned Done while in a dir loop; re-init and continue. + | ChunkFoldLoopReinit + WindowsPath + [WindowsPath] + DirStream + [WindowsPath] + Int + -- | Yield a value, then transition to the wrapped state. + | ChunkFoldYield (Either [WindowsPath] b) (ChunkFoldStreamState fs b) + | ChunkFoldStop + +-- | Like 'readEitherChunks' but collects entries using a 'Fold' instead of +-- buffering into a list. All entries (both directories and files) are fed to +-- the fold. The fold output @b@ is yielded as @Right@ each time the fold +-- terminates; directory paths are additionally yielded as @Left@ in chunks so +-- the caller can recurse into them. When the traversal completes, the fold's +-- @final@ is invoked and its result is yielded only if the fold consumed at +-- least one path (matching 'Streamly.Internal.Data.Stream.foldMany' +-- semantics). +{-# INLINE readEitherFold #-} +readEitherFold + :: MonadIO m + => (ReadOptions -> ReadOptions) + -> [WindowsPath] + -> Fold m WindowsPath b + -> Stream m (Either [WindowsPath] b) +readEitherFold _confMod alldirs (Fold fstep finitial _ ffinal) = + Stream step (ChunkFoldStart alldirs [] 0) + + where + + -- We want to keep the dir batching as low as possible for better + -- concurrency esp when the number of dirs is low. + dirMax = 4 + + -- Returns Just (dname, dattrs) on success, Nothing at end of stream. + readNextEntry (DirStream (h, ref, fdata)) = + withForeignPtr fdata $ \ptr -> do + firstTime <- readIORef ref + success <- + if firstTime + then writeIORef ref False >> return True + else c_FindNextFileW h ptr + if success + then do + let dname = #{ptr WIN32_FIND_DATAW, cFileName} ptr + dattrs :: #{type DWORD} <- + #{peek WIN32_FIND_DATAW, dwFileAttributes} ptr + return (Just (dname, dattrs)) + else do + err <- getLastError + if err == (# const ERROR_NO_MORE_FILES ) + then return Nothing + else Win32.failWith "findNextFile" err + + step _ (ChunkFoldYield x next) = return $ Yield x next + + step _ ChunkFoldStop = return Stop + + step _ (ChunkFoldStart xs dirs ndirs) = do + r <- finitial + case r of + Fold.Done b -> + return $ Yield (Right b) (ChunkFoldStart xs dirs ndirs) + Fold.Partial fs -> + return $ Skip (ChunkFoldNext xs dirs ndirs fs False) + + step _ (ChunkFoldLoopReinit curdir xs ds dirs ndirs) = do + r <- finitial + case r of + Fold.Done b -> + return $ Yield (Right b) + (ChunkFoldLoopReinit curdir xs ds dirs ndirs) + Fold.Partial fs -> + return $ Skip (ChunkFoldLoop curdir xs ds dirs ndirs fs False) + + step _ (ChunkFoldNext (x:xs) dirs ndirs fs nonEmpty) = do + ds <- liftIO $ openDirStream x + return $ Skip (ChunkFoldLoop x xs ds dirs ndirs fs nonEmpty) + + step _ (ChunkFoldNext [] dirs _ fs nonEmpty) = do + b <- ffinal fs + case (nonEmpty, dirs) of + (False, []) -> return Stop + (False, _) -> return $ Yield (Left dirs) ChunkFoldStop + (True, []) -> return $ Yield (Right b) ChunkFoldStop + (True, _) -> + return $ Yield (Right b) + (ChunkFoldYield (Left dirs) ChunkFoldStop) + + step _ st@(ChunkFoldLoop curdir xs ds dirs ndirs fs nonEmpty) = do + r <- liftIO $ readNextEntry ds + case r of + Just (dname, dattrs) -> + if (dattrs .&. (#const FILE_ATTRIBUTE_DIRECTORY) /= 0) + then do + isMeta <- liftIO $ isMetaDir dname + if isMeta + then return $ Skip st + else do + path <- liftIO $ appendW16CString curdir dname + let dirs1 = path : dirs + ndirs1 = ndirs + 1 + r1 <- fstep fs path + case r1 of + Fold.Done b -> + if ndirs1 >= dirMax + then return $ Yield (Right b) + (ChunkFoldYield (Left dirs1) + (ChunkFoldLoopReinit + curdir xs ds [] 0)) + else return $ Yield (Right b) + (ChunkFoldLoopReinit + curdir xs ds dirs1 ndirs1) + Fold.Partial fs1 -> + if ndirs1 >= dirMax + then return $ Yield (Left dirs1) + (ChunkFoldLoop + curdir xs ds [] 0 fs1 True) + else return $ Skip + (ChunkFoldLoop + curdir xs ds dirs1 ndirs1 fs1 True) + else do + path <- liftIO $ appendW16CString curdir dname + r1 <- fstep fs path + case r1 of + Fold.Done b -> + return $ Yield (Right b) + (ChunkFoldLoopReinit + curdir xs ds dirs ndirs) + Fold.Partial fs1 -> + return $ Skip + (ChunkFoldLoop + curdir xs ds dirs ndirs fs1 True) + Nothing -> do + -- XXX Exception safety: fold state is not cleaned up if + -- closeDirStream throws. + liftIO $ closeDirStream ds + return $ Skip (ChunkFoldNext xs dirs ndirs fs nonEmpty) + ------------------------------------------------------------------------------ -- Chunked byte-buffered reads ------------------------------------------------------------------------------ diff --git a/docs/Developer/FileSystem.DirIO.md b/docs/Developer/FileSystem.DirIO.md new file mode 100644 index 0000000000..462cd91021 --- /dev/null +++ b/docs/Developer/FileSystem.DirIO.md @@ -0,0 +1,390 @@ +# Directory traversal API design notes + +## Filesystem functionality modules + +The DirIO module mainly provides "readdir" functionality. File stat +functionality is coupled to readdir because we may return file stats along +with filepaths, or may provide functionality to filter based on stats. + +The FileTest module in streamly-coreutils package provides the file stat read +operations, we may have to bring that here if some coupling with readdir is +needed. + +FileIO module provides regular file create operation. + +## Traversal vs Output control + +There are two dimensions to recursive directory reading APIs, traversal and +filtering. Traversal controls how we recurse further down in a directory +tree; we may need to allow certain traversals and prune others. On the +other hand, output filtering controls what we want to return in the output +stream e.g. we can filter based on type (dir, file, symlink) or based on +the filename etc. These two dimensions can be completely independent of +each other. + +## Returning metadata or using config options + +A design point is whether to return metadata for files and let consumers do +filtering based on the metadata, or to use config options/predicates inside +readdir itself and return only transformed results as output. Using config +predicates keeps the API and stream representation cleaner but is not as +powerful as emitting the metadata. + +If we want to expose metadata, we may need different APIs: without +metadata, with fast metadata, and with full metadata. In each case the fold +input would be different. + +* readMinimal: read only path names, no metadata +* readStandard: read path and minimal metadata +* readFull: read full metadata + +Full metadata can always be read by mapping a stat call on a stream of paths +rather than integrating it into readdir itself. The advantage of doing it in +readdir is in the case when we need to do a stat in readdir anyway e.g. when +the file system does not provide the type, or when followSymlinks is "on". + +## Passing a scan to readdir or using config options + +Even if we want filtering inside readdir, there are two options: + +* pass config predicates/options +* pass a scan/fold which can directly consume metadata and return output + +Passing a scan is similar to exposing metadata, but only to a scan that is +passed directly to readdir. + +By passing a scan we can process output right at the source and produce a +cooked output. Otherwise we may have to produce a stream of intermediate +structures which may have more per-item overhead and that overhead may not +get eliminated by fusion. For example, a fold can directly write the +CString from readdir to the output buffer whereas if we output a Path then +we incur overhead of an intermediate structure. + +However, such a fold must not be allowed to retain the CString pointer from +readdir because that memory is owned internally and may be released after +readdir returns. This can be enforced by fixing the output type to Path, +which copies data into managed memory (e.g. MutByteArray-backed Path). + +Another drawback of passing a fold is that it exposes low-level details +like CString and Dirent from readdir for a relatively small performance +gain. + +If we want only filtering, a cleaner solution is to use configuration +predicates/options in readdir and keep the emitted stream representation +simple. However, a scan can be more powerful e.g. if we want to count how +many files of each type are present in a directory we can do that with a +scan or by emitting the metadata. + +## Traversal State vs Output + +There are two ways to handle traversal. One approach is to segregate the output +into two parts: one for terminal nodes and another for directory nodes to be +traversed further. The final output then combines these two parts to produce +the complete result. The other approach is to output all child nodes in the +output part, while also placing the nodes to be traversed further in the +traversal part. In this case, the output is duplicated, which could become +significant if there are a large number of directories. + +However, there are a couple of problems with the segregation approach. During +concurrent processing, if a worker finds that it is busy generating a large +amount of output from one directory, it may return the remaining work so that +other workers can pick it up. As a result, the same directories can appear +multiple times in the traversal part. If we also include them in the output, +this will lead to duplicate entries in the final output. Additionally, for +concurrent evaluation we may divide the traversal into arbitrarily small parts, +which can nullify the chunking benefit because we ultimately have to recombine +all these small parts into larger chunks again. + +Therefore, it may be better to place everything in the output from the +beginning and keep the traversal output completely independent of the normal +output. + +The readdir APIs conceptually produce two independent outputs: + +* traversal state (directories to recurse into) +* observable output + +The output is essentially two streams via an Either type: + +* Left values are traversal state (directories selected for recursion) +* Right values are observable output (files and dirs both) + +The Left channel controls traversal and can have independent filters in +readdir options. The Right channel controls emitted output and is filtered +independently. + +This separation keeps traversal control orthogonal to output filtering and +allows the low-level scanner to emit both efficiently in a single pass. + +## Traversal Control + +There are two levels of traversal control possible. + +One level depends on state/information naturally available only to readdir, +e.g. follow-symlink behavior because file type/stat information is available +during scanning. + +Another level is naturally handled outside readdir by the recursion control +combinator (e.g. concatIterate), such as filtering based on: + +* traversal depth +* directory names +* path patterns + +## Output Control + +Again there are two levels. + +Control based on information naturally available only to readdir (e.g. file +type/stat data) can be implemented via config predicates/options to +readdir. + +Control based purely on path information (e.g. filename, extension, regex, +glob patterns etc.) can be implemented by filtering the stream returned by +readdir. + +If the stream remains lazy with good fusion, we should not lose significant +performance with this modular design. + +Chunking of traversal output and normal output can also be implemented +outside readdir over the stream. We should compare performance of chunking +inside readdir vs outside to evaluate whether a monolithic design provides +measurable gains. + +## Stat based filtering for traversal and output streams + +In case of Windows along with file type the API provides other attributes +like times, size, readonly which are generally available only by using stat +on Posix. We can use predicates to filter based on these as well. In case of +Posix we will have to dynamically decide whether to stat or not to use these +predicates. One effect of that is if someone uses these predicates the +performance will suddenly drop, we need to document that. + +Another point is any of these can be used for traversal filtering of +directories as well. So do we need separate option arguments for traversal +and output control or just different names for the filters? Such a filter +can be just one composed function so we do not need many options, it will be +just one filter for traversal and one for output. + +## Symlink Traversal Mechanism + +This is part of traversal configuration controlled by followSymlinks option. + +Currently, for the traversal root we always follow symlinks, there is no +option to change that. We control the follow symlinks behavior by +controlling the traversal stream contents. + +For recursive traversal, we emit directories in the traversal stream, +therefore, we need to know the type of each directory entry, if it is a dir +then it is emitted in the traversal stream otherwise in the regular output +stream. When the type of directory entries is not available we use an +explicit stat call. When following symlinks, even if we know the types of +directory entries we need to determine the type of the link destination +using an explicit stat call and classify it as dir or not dir. + +Alternatively, instead of classifying the dirents using stat, we can emit +files that potentially may be dirs in the traversal output, the recursion +handler will then feed those to readdir again, at that time we can deal with +ENOTDIR when doing an opendir. We can just ignore that error if it is not a +dir. This way we do not need to do an additional stat for symlinks, the stat +happens when we are trying to read it as dir, therefore, we can save a call +in case the symlink is a dir. For using this strategy, we will have to +translate the followSymlinks option to O_NOFOLLOW open flag. Using that +strategy we can classify symlinks or DT_UNKNOWN as potential dirs. Not sure +what potential problems can arise if we follow this. + +## Symlink Resolution + +In general, instead of fully resolving all indirections in a symlink at once +we can resolve it one step at a time utilizing the recursion in the +traversal handler. For example, we can find the link destination and emit it +in the traversal stream as a potential recursion candidate. This could be +useful in a general graph traversal, however, in case of file system +symlinks resolving them up to a non-symlink destination and then determining +whether it is a dir or not is more efficient. + +## Handling symlink errors + +When resolving a symlink we may encounter errors only if a directory entry +is a symlink. If the directory entry is not a symlink then stat on it will +usually succeed, we know it exists and it has permissions because the path +resolution goes through this directory which we just read, it will not give +ENOENT unless the file was modified, deleted or recreated after we read the +dirent. + +For symlink resolution we may encounter, missing file, permission issues, +cycles. For that we need options to control the behavior. + +## Errors during opendir + +The same errors that apply to symlink resolution also apply to opendir for +the directory we are reading. These errors are especially important to +control during recursive traversal, probably not very useful for reading a +single dir. If a directory in the traversal stream went away or does not +have permission to read we may want to ignore those errors for recursive +traversal. This handling may be separate from the similar error handling for +symlink resolution which applies only when we are following symlinks. + +The error handling for the traversal root and for reading subsequent +subdirectories may need to be different. For example, we want to know any +errors when opening the root itself but may want to ignore errors on +recursive travesal. This will have to be handled by the traversal handler. +The traversal handler can check the errors on the root separately and then +use error suppression on the entire traversal. + +If we use a followSymlinks control at opendir, then the traversal handler +can resolve the root symlink separately and then do the entire traversal +with noFollowSymlinks. But if we do that the path may resolve to a different +prefix and we may want to emit paths with the original non-resolved prefix, +so it is better for readdir to return relative paths rather than prefixed +paths, or we should have the ability to supply a custom prefix to readdir. + +We may throw errors and leave the handling to the traversal handler, but +that may less efficient and less ergonomic. Readdir will always have to +be wrapped into an exception handler, should measure if that works as well. + +## Cyclic symlink paths + +This applies only when following symlinks during traversal. + +One way to curtail cyclic symlink traversal is relying on ELOOP, which +aborts traversal after too many levels of indirection. + +If a loop is found we can ignore ELOOP and continue, however, it may be +inefficient because the cycle may be encountered repeatedly from different +nodes in the graph. + +## Detecting Cycles + +If we do not want to use ELOOP for some reason, or if we want a cycle detection +for graph traversal in general we can use a generic mechanism by tracking +visited inode/device or paths. Cycle detection can be done in the traversal +combinator (concatIterate). + +When we encounter a symlink we resolve it into a canonical path, and then check +if the canonical path has already been visited. The canonical path serves as +the unique label of the node in the graph. + +If we know unique labels identifying nodes in the graph we can maintain a bloom +filter and a hash table to find visited nodes quickly. If the nodes are +contiguous integers then a bitmap is enough. + +## Broken symlinks + +This applies only when following symlinks during traversal. + +If there are broken symlinks, we have two options, check it even before +emitting the dir in traversal output or check it when we perform readdir. +Postponing it may be better because we may not even need it. But it can also +be done via traversal predicates to check whether file exists or not. + +Another point is whether we handle broken links via an explicit option or +via stat based traversal predicates. If we do it via predicates then it will +require full resolution of the link recursively, detecting ELOOP right +there, if we resolve only once then readdir will anyway have to resolve +again when the path is sent back for reading via traversal handler. One way +is to let readdir resolve once and emit the result in the traversal output, +but that will create recursion cycles that will have to be detected in the +traversal handler and it will also be inefficient because of too many +resolution steps. Though something like this can make sense in handling http +crawler redirections. + +## Symlink resolved vs unresolved paths + +When a symlink is encountered, should we emit its path through the traversal +root or should we use the resolved path which could be any arbitrary path. +Should we control this with options? + +If we use resolved paths then the cost of resolving will not be paid during +further traversals, but that may or may not be significant. + +Also, if we use the resolved path then the traversal depth will have to be +maintained, we cannot determine it by just counting the components after +removing the traversal root. + +## Traversal Depth + +How is depth defined? + +To keep things simple, traversal depth is defined as the number of recursive +traversal steps from the starting traversal root. A symlink resolution is +counted as only one step even if it involves many indirections. + +Readdir itself is unaware of traversal depth. We pass it a parent directory +and it reads the children. Recursing into one of the children counts as +one level deeper. + +If descendant paths are constructed incrementally by appending child names +returned by readdir, and readdir never returns "." or ".." entries, then +recursion depth is identical to lexical path depth relative to the traversal +root. + +Therefore, traversal depth can be computed statelessly from constructed +descendant paths by stripping the traversal root prefix and counting path +segments. This can be checked before descending further. + +It does not matter whether the traversal root itself contains ".." +segments or symlinks, as long as descendant paths are constructed only by +appending returned child names. + +However, this approach has a disadvantage, if a child is a symlink + +## Full vs Relative Paths + +Paths returned by "read" can be: + +* absolute (/x/y/z) +* relative to cwd (./y/z) +* relative path segments (y/z) + +To accommodate all cases we can provide a prefix to readdir to attach while +generating paths. For example, the prefix would be "/x" in the previous +example. + +Alternatively, higher layers could attach prefixes, but generating the +final path once inside readdir is likely more efficient than rebuilding it +later. + +Emitting paths as a tuple (parent, child) may be more efficient because the +parent path can be shared instead of copied into every emitted path. + +One advantage of a lower-level API emitting tuples is that construction of +full paths can be deferred until chunk creation or final output buffering +downstream. If the lower-level API itself emits full paths and downstream +processing copies those paths again into chunk buffers, then the earlier +path-prefix copy may become additional overhead. + +Therefore, one possible design is for the lowest-level API to always emit +(parent, child) tuples, while higher-level APIs expose either: + +* relative paths +* full prefixed paths + +generated from the tuple representation. + +However, tuple representation also complicates downstream stream processing +because consumers often require self-contained full paths and would need to +repeatedly reconstruct them. + +Alternatively, readdir can directly emit either relative or full paths based +on configuration options. This keeps the stream representation simpler and +more ergonomic, though it may lose the advantage of deferring parent-prefix +copying until final chunk construction. + +Need to benchmark whether deferred parent copying provides measurable gains +compared to the simpler direct full-path representation. + +## Option structure + +The options that apply only when followSymlinks is "on" can be passed as an +argument to (followSymlinks SymlinkOptions), for default we can use +(followSymlinks id), to disable we can use noFollowSymlinks. Or just keep a +flat structure and these options do nothing when not following symlinks. + +## Benchmark workloads + +For symlink heavy workload we can try using a nix store. + +For stat efficiency we need to use a filesystem/OS that does not provide the +type of file during readdir. + diff --git a/hie.yaml b/hie.yaml index 085a65e20a..dfb0ac6157 100644 --- a/hie.yaml +++ b/hie.yaml @@ -12,6 +12,12 @@ cradle: cabal: - path: "./core/src" component: "lib:streamly-core" + - path: "./bench-test-lib" + config: + cradle: + cabal: + - path: "./bench-test-lib/src/" + component: "lib:bench-test-lib" - path: "./benchmark" config: cradle: @@ -214,6 +220,7 @@ cradle: dependencies: - streamly.cabal - ./core/streamly-core.cabal + - ./bench-test-lib/bench-test-lib.cabal - ./benchmark/streamly-benchmarks.cabal - ./test/streamly-tests.cabal - hie.yaml diff --git a/test/Streamly/Test/FileSystem/DirIO.hs b/test/Streamly/Test/FileSystem/DirIO.hs index ce660f672e..2ee684f5e7 100644 --- a/test/Streamly/Test/FileSystem/DirIO.hs +++ b/test/Streamly/Test/FileSystem/DirIO.hs @@ -19,10 +19,7 @@ module Main (main) where import Data.Word (Word8) import GHC.IO.Encoding (setLocaleEncoding, utf8) -#if !defined(mingw32_HOST_OS) && !defined(__MINGW32__) import Streamly.Data.Array (Array) -#endif -import System.Directory (createDirectoryLink) import System.FilePath (()) import System.IO.Temp (withSystemTempDirectory) @@ -31,7 +28,11 @@ import qualified Streamly.Internal.Unicode.Stream as Unicode (lines) import qualified Streamly.Data.Stream.Prelude as Stream import qualified Streamly.Data.Fold as Fold import qualified Streamly.Data.StreamK as StreamK + +#if !defined(mingw32_HOST_OS) && !defined(__MINGW32__) +import System.Directory (createDirectoryLink) import qualified Streamly.FileSystem.DirIO as Dir +#endif import Prelude hiding (last, length) import BenchTestLib.DirIO @@ -57,7 +58,6 @@ testCorrectness expectation lister = do $ Unicode.decodeUtf8 lister reality `shouldBe` expectation -#if !defined(mingw32_HOST_OS) && !defined(__MINGW32__) testCorrectnessByteChunked :: [FilePath] -> Stream.Stream IO (Array Word8) -> Expectation testCorrectnessByteChunked expectation lister = do @@ -69,8 +69,8 @@ testCorrectnessByteChunked expectation lister = do $ Unicode.lines Fold.toList $ Unicode.decodeUtf8Chunks lister reality `shouldBe` expectation -#endif +#if !defined(mingw32_HOST_OS) && !defined(__MINGW32__) testSymLinkFollow :: FilePath -> Spec testSymLinkFollow tmpDir = do let fp = tmpDir "dir-structure-small-sym" @@ -130,6 +130,7 @@ testSymLinkFollow tmpDir = do . Dir.ignoreSymlinkLoops False ) fp) +#endif -- | List the current directory recursively runTests :: FilePath -> IO () @@ -154,11 +155,12 @@ runTests tmpDir = do hspec $ describe moduleName $ do describe "Sanity" $ do -#if !defined(mingw32_HOST_OS) && !defined(__MINGW32__) it "listDirByteChunked" $ testCorrectnessByteChunked (tail pathsBig) (listDirByteChunked bigTree) -#endif + it "listDirByteChunkedFold" $ + testCorrectnessByteChunked + (tail pathsBig) (listDirByteChunkedFold bigTree) -- NOTE: The BFS traversal fails with: -- openDirStream: resource exhausted (Too many open files) -- if a bigger directory tree is used @@ -200,7 +202,26 @@ runTests tmpDir = do testCorrectness pathsBig (listDirChunkParInterleaved id bigTree) it "listDirChunkParOrdered" $ testCorrectness pathsBig (listDirChunkParOrdered id bigTree) + it "listDirChunkFoldDfs" $ + testCorrectness (tail pathsBig) (listDirChunkFoldDfs id bigTree) + it "listDirChunkFoldBfs" $ + testCorrectness (tail pathsSmall) (listDirChunkFoldBfs id smallTree) + it "listDirChunkFoldBfsRev" $ + testCorrectness (tail pathsSmall) (listDirChunkFoldBfsRev id smallTree) + it "listDirChunkFoldAppend" $ + testCorrectness (tail pathsBig) (listDirChunkFoldAppend id bigTree) + it "listDirChunkFoldInterleave" $ + testCorrectness (tail pathsBig) (listDirChunkFoldInterleave id bigTree) + it "listDirChunkFoldPar" $ + testCorrectness (tail pathsBig) (listDirChunkFoldPar id bigTree) + it "listDirChunkFoldParInterleaved" $ + testCorrectness + (tail pathsBig) (listDirChunkFoldParInterleaved id bigTree) + it "listDirChunkFoldParOrdered" $ + testCorrectness (tail pathsBig) (listDirChunkFoldParOrdered id bigTree) +#if !defined(mingw32_HOST_OS) && !defined(__MINGW32__) testSymLinkFollow tmpDir +#endif -- | List the current directory recursively main :: IO () diff --git a/test/streamly-tests.cabal b/test/streamly-tests.cabal index 322625aae9..287e92529b 100644 --- a/test/streamly-tests.cabal +++ b/test/streamly-tests.cabal @@ -15,6 +15,11 @@ flag limit-build-mem manual: True default: False +flag limit-test-mem + description: Limits the RTS heap to 2 GB when running the tests + manual: True + default: True + flag use-large-mem description: Include tests that require large amount of memory manual: True @@ -206,6 +211,8 @@ common test-options -rtsopts -with-rtsopts "-t" -fno-ignore-asserts + if flag(limit-test-mem) + ghc-options: -with-rtsopts=-M2G include-dirs: . build-depends: streamly-tests @@ -221,6 +228,8 @@ common always-optimized -rtsopts -with-rtsopts "-t" -fno-ignore-asserts + if flag(limit-test-mem) + ghc-options: -with-rtsopts=-M2G if flag(fusion-plugin) && !impl(ghcjs) && !impl(ghc < 8.6) ghc-options: -fplugin Fusion.Plugin @@ -549,15 +558,14 @@ test-suite FileSystem.DirIO buildable: False build-depends: bench-test-lib - -- Fix this test-suite for Windows - if os(windows) - buildable: False test-suite Network.Inet.TCP import: lib-options type: exitcode-stdio-1.0 main-is: Streamly/Test/Network/Inet/TCP.hs ghc-options: -rtsopts -fno-ignore-asserts + if flag(limit-test-mem) + ghc-options: -with-rtsopts=-M2G include-dirs: . build-depends: streamly-tests -- Cannot killThread in listen/accept on Windows threaded runtime @@ -571,6 +579,8 @@ test-suite Network.Socket type: exitcode-stdio-1.0 main-is: Streamly/Test/Network/Socket.hs ghc-options: -rtsopts -fno-ignore-asserts + if flag(limit-test-mem) + ghc-options: -with-rtsopts=-M2G include-dirs: . build-depends: streamly-tests -- Cannot killThread in listen/accept on Windows threaded runtime