Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bench-test-lib/bench-test-lib.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,6 @@ library
, streamly-core
, streamly
, directory
, filepath
hs-source-dirs: src
default-language: Haskell2010
179 changes: 163 additions & 16 deletions bench-test-lib/src/BenchTestLib/DirIO.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,37 +29,51 @@ module BenchTestLib.DirIO
, listDirChunkPar
, listDirChunkParInterleaved
, listDirChunkParOrdered
#if !defined(mingw32_HOST_OS) && !defined(__MINGW32__)
, listDirChunkFoldDfs
, listDirChunkFoldBfs
, listDirChunkFoldBfsRev
, listDirChunkFoldAppend
, listDirChunkFoldInterleave
, listDirChunkFoldPar
, listDirChunkFoldParInterleaved
, listDirChunkFoldParOrdered
, listDirByteChunked
#endif
, listDirByteChunkedFold
) where

--------------------------------------------------------------------------------
-- Imports
--------------------------------------------------------------------------------

import Data.Foldable (for_)
import Data.IORef (newIORef, modifyIORef', readIORef)
import Data.Maybe (fromJust)
import Data.Word (Word8)
#if !defined(mingw32_HOST_OS) && !defined(__MINGW32__)
import Streamly.Data.Array (Array)
#if defined(mingw32_HOST_OS) || defined(__MINGW32__)
import Data.Word (Word16)
#endif
import Streamly.Data.Array (Array)
import Streamly.Data.Stream (Stream)
import Streamly.Data.Unfold (Unfold)
import Streamly.FileSystem.Path (Path)
import Streamly.Unicode.String (str)
import System.Directory (createDirectoryIfMissing)
import Data.Foldable (for_)
import System.FilePath ((</>))

import qualified Streamly.Data.Stream.Prelude as Stream
import qualified Streamly.Data.Array as Array
import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Internal.Data.Stream as Stream
import qualified Streamly.Data.StreamK as StreamK
import qualified Streamly.Internal.Data.StreamK as StreamK
import qualified Streamly.Data.Unfold as Unfold
import qualified Streamly.Internal.Data.Unfold as Unfold
import qualified Streamly.Internal.FileSystem.DirIO as Dir
import qualified Streamly.FileSystem.Path as Path
import qualified Streamly.Internal.FileSystem.Path as Path
#if defined(mingw32_HOST_OS) || defined(__MINGW32__)
import qualified Streamly.Internal.Data.Array as Array (unsafeCast)
import qualified Streamly.Unicode.Stream as Unicode
#endif

--------------------------------------------------------------------------------
-- Helpers
Expand Down Expand Up @@ -102,7 +116,6 @@ streamDirMaybe
-> Either Path b -> Maybe (Stream IO (Either Path Path))
streamDirMaybe f = either (Just . Dir.readEitherPaths f) (const Nothing)

#if !defined(mingw32_HOST_OS) && !defined(__MINGW32__)
_streamDirByteChunked
:: (Dir.ReadOptions -> Dir.ReadOptions)
-> Either [Path] b -> Stream IO (Either [Path] (Array Word8))
Expand All @@ -113,7 +126,16 @@ streamDirByteChunkedMaybe
-> Either [Path] b -> Maybe (Stream IO (Either [Path] (Array Word8)))
streamDirByteChunkedMaybe f =
either (Just . Dir.readEitherByteChunks f) (const Nothing)
#endif

-- | Like 'streamDirByteChunkedMaybe' but uses 'Dir.readEitherFold' with
-- 'Path.packPathsEndBy' to produce newline-terminated byte chunks.
streamDirByteChunkedFoldMaybe
:: (Dir.ReadOptions -> Dir.ReadOptions)
-> Either [Path] b -> Maybe (Stream IO (Either [Path] (Array Word8)))
streamDirByteChunkedFoldMaybe f =
either
(Just . flip (Dir.readEitherFold f) (Path.packPathsEndBy 32000 10))
(const Nothing)

streamDirChunkedMaybe
:: (Dir.ReadOptions -> Dir.ReadOptions)
Expand All @@ -125,6 +147,21 @@ streamDirChunked
-> Either [Path] b -> Stream IO (Either [Path] [Path])
streamDirChunked f = either (Dir.readEitherChunks f) (const Stream.nil)

-- | Same shape as 'streamDirChunkedMaybe' but uses 'Dir.readEitherFold' with
-- 'Fold.toList', which should be equivalent in coverage to
-- 'Dir.readEitherChunks' for traversal tests.
streamDirChunkedFoldMaybe
:: (Dir.ReadOptions -> Dir.ReadOptions)
-> Either [Path] b -> Maybe (Stream IO (Either [Path] [Path]))
streamDirChunkedFoldMaybe f =
either (Just . flip (Dir.readEitherFold f) Fold.toList) (const Nothing)

streamDirChunkedFold
:: (Dir.ReadOptions -> Dir.ReadOptions)
-> Either [Path] b -> Stream IO (Either [Path] [Path])
streamDirChunkedFold f =
either (flip (Dir.readEitherFold f) Fold.toList) (const Stream.nil)

--------------------------------------------------------------------------------
-- Functions
--------------------------------------------------------------------------------
Expand All @@ -139,20 +176,47 @@ createDirStucture root d w = do
createDirStucture_ ref parentDir depth width = do
for_ [1..width] $ \i -> do
let iStr = show i
subDir = [str|#{parentDir}/dir_#{iStr}|]
subDir = [str|#{parentDir}|] </> [str|dir_#{iStr}|]
createDirectoryIfMissing True subDir
modifyIORef' ref (subDir:)
createDirStucture_ ref subDir (depth - 1) width

#if !defined(mingw32_HOST_OS) && !defined(__MINGW32__)
-- Fastest implementation, only works for posix as of now.
-- Fastest implementation. On Windows the underlying readEitherByteChunks
-- emits UTF-16LE bytes (each char as 2 bytes, separator '\\' and newline '\n'
-- also encoded as 2 bytes). Chunks are split at entry boundaries, so each
-- chunk can be transcoded independently to UTF-8 without splitting surrogate
-- pairs. On Posix the bytes are already UTF-8.
{-# INLINE toUtf8Chunks #-}
toUtf8Chunks :: Stream IO (Array Word8) -> Stream IO (Array Word8)
#if defined(mingw32_HOST_OS) || defined(__MINGW32__)
toUtf8Chunks = Stream.mapM utf16leChunkToUtf8

utf16leChunkToUtf8 :: Array Word8 -> IO (Array Word8)
utf16leChunkToUtf8 arr =
Stream.fold Array.create
$ Unicode.encodeUtf8
$ Unicode.decodeUtf16le
$ Array.read (Array.unsafeCast arr :: Array Word16)
#else
toUtf8Chunks = id
#endif

{-# INLINE listDirByteChunked #-}
listDirByteChunked :: FilePath -> Stream IO (Array Word8)
listDirByteChunked inp = do
Stream.catRights
listDirByteChunked inp =
toUtf8Chunks
$ Stream.catRights
$ Stream.concatIterate (streamDirByteChunkedMaybe id)
$ Stream.fromPure (Left [fromJust $ Path.fromString inp])
#endif

-- | Like 'listDirByteChunked' but uses 'Dir.readEitherFold' with
-- 'Path.packPathsEndBy' to compact paths into newline-terminated byte chunks.
{-# INLINE listDirByteChunkedFold #-}
listDirByteChunkedFold :: FilePath -> Stream IO (Array Word8)
listDirByteChunkedFold inp = do
Stream.catRights
$ Stream.concatIterate (streamDirByteChunkedFoldMaybe id)
$ Stream.fromPure (Left [fromJust $ Path.fromString inp])

-- Faster than the listDir implementation below
{-# INLINE listDirChunkedWith #-}
Expand All @@ -161,19 +225,36 @@ listDirChunkedWith
-> [Char] -> Stream IO Word8
listDirChunkedWith act inp = do
Stream.unfoldEachEndBy 10 Array.reader
$ fmap (Array.asBytes . Path.toArray)
$ fmap Path.toUtf8Array
$ Stream.unfoldEach Unfold.fromList
$ fmap (either id id)
$ act
$ Stream.fromPure (Left [fromJust $ Path.fromString inp])

-- | Variant of 'listDirChunkedWith' for the 'readEitherFold' variants. The
-- fold receives both directory and file entries, so its 'Right' output already
-- contains every path; the 'Left' chunks are only used to drive further
-- traversal and must be discarded here to avoid emitting directory paths
-- twice.
{-# INLINE listDirChunkedFoldWith #-}
listDirChunkedFoldWith
:: (Stream IO (Either [Path] b) -> Stream IO (Either [Path] [Path]))
-> [Char] -> Stream IO Word8
listDirChunkedFoldWith act inp = do
Stream.unfoldEachEndBy 10 Array.reader
$ fmap Path.toUtf8Array
$ Stream.unfoldEach Unfold.fromList
$ Stream.catRights
$ act
$ Stream.fromPure (Left [fromJust $ Path.fromString inp])

{-# INLINE listDirWith #-}
listDirWith
:: (Stream IO (Either Path Path) -> Stream IO (Either Path Path))
-> [Char] -> Stream IO Word8
listDirWith act inp = do
Stream.unfoldEachEndBy 10 Array.reader
$ fmap (Array.asBytes . Path.toArray . either id id)
$ fmap (Path.toUtf8Array . either id id)
$ act
$ Stream.fromPure (Left (fromJust $ Path.fromString inp))

Expand Down Expand Up @@ -295,3 +376,69 @@ listDirChunkParOrdered
listDirChunkParOrdered f =
listDirChunkedWith
(Stream.parConcatIterate (Stream.ordered True) (streamDirChunked f))

--------------------------------------------------------------------------------
-- Chunked via readEitherFold + Fold.toList
--------------------------------------------------------------------------------

-- These mirror the listDirChunk* variants above but exercise 'readEitherFold'
-- with 'Fold.toList'. Since the fold receives all entries (dirs and files),
-- they use 'listDirChunkedFoldWith' which keeps only the 'Right' output and
-- discards the 'Left' traversal-driver chunks.

{-# INLINE listDirChunkFoldDfs #-}
listDirChunkFoldDfs
:: (Dir.ReadOptions -> Dir.ReadOptions) -> [Char] -> Stream IO Word8
listDirChunkFoldDfs f =
listDirChunkedFoldWith (Stream.concatIterate (streamDirChunkedFoldMaybe f))

{-# INLINE listDirChunkFoldBfs #-}
listDirChunkFoldBfs
:: (Dir.ReadOptions -> Dir.ReadOptions) -> [Char] -> Stream IO Word8
listDirChunkFoldBfs f =
listDirChunkedFoldWith
(Stream.bfsConcatIterate (streamDirChunkedFoldMaybe f))

{-# INLINE listDirChunkFoldBfsRev #-}
listDirChunkFoldBfsRev
:: (Dir.ReadOptions -> Dir.ReadOptions) -> [Char] -> Stream IO Word8
listDirChunkFoldBfsRev f =
listDirChunkedFoldWith
(Stream.altBfsConcatIterate (streamDirChunkedFoldMaybe f))

{-# INLINE listDirChunkFoldAppend #-}
listDirChunkFoldAppend
:: (Dir.ReadOptions -> Dir.ReadOptions) -> [Char] -> Stream IO Word8
listDirChunkFoldAppend f =
listDirChunkedFoldWith
(concatIterateWith (streamDirChunkedFold f) StreamK.append)

{-# INLINE listDirChunkFoldInterleave #-}
listDirChunkFoldInterleave
:: (Dir.ReadOptions -> Dir.ReadOptions) -> [Char] -> Stream IO Word8
listDirChunkFoldInterleave f =
listDirChunkedFoldWith
(mergeIterateWith (streamDirChunkedFold f) StreamK.interleave)

{-# INLINE listDirChunkFoldPar #-}
listDirChunkFoldPar
:: (Dir.ReadOptions -> Dir.ReadOptions) -> [Char] -> Stream IO Word8
listDirChunkFoldPar f =
listDirChunkedFoldWith
(Stream.parConcatIterate id (streamDirChunkedFold f))

{-# INLINE listDirChunkFoldParInterleaved #-}
listDirChunkFoldParInterleaved
:: (Dir.ReadOptions -> Dir.ReadOptions) -> [Char] -> Stream IO Word8
listDirChunkFoldParInterleaved f =
listDirChunkedFoldWith
(Stream.parConcatIterate
(Stream.interleaved True) (streamDirChunkedFold f))

{-# INLINE listDirChunkFoldParOrdered #-}
listDirChunkFoldParOrdered
:: (Dir.ReadOptions -> Dir.ReadOptions) -> [Char] -> Stream IO Word8
listDirChunkFoldParOrdered f =
listDirChunkedFoldWith
(Stream.parConcatIterate
(Stream.ordered True) (streamDirChunkedFold f))
1 change: 1 addition & 0 deletions core/src/Streamly/FileSystem/Path.hs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ module Streamly.FileSystem.Path
, unsafeJoin
, join
, joinStr
, packPathsEndBy

-- * Splitting root
, splitRoot
Expand Down
99 changes: 2 additions & 97 deletions core/src/Streamly/Internal/FileSystem/DirIO.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,103 +8,7 @@
-- Maintainer : streamly@composewell.com
-- Portability : GHC
--
-- Directory traversal API design notes:
--
-- Paths returned by readdir:
-- --------------------------
-- The paths returned by "read" can be absolute (/x/y/z), relative to current
-- directory e.g. if current dir is /x then path is (./y/z), or path segments
-- relative to current dir e.g. (y/z). To accomodate all the cases we can
-- provide a prefix to readdir to attach to the paths being generated e.g. the
-- prefix would be "/x" in the previous example. Alternatively, we could take
-- the approach of the higher layer doing that, but it is more efficient to
-- allocate the path buffer once rather than modifying it later. We can do this
-- by passing a fold to transform the output.
--
-- Filtering of paths:
-- -------------------
--
-- It may be more efficient to apply a filter to the paths during readdir API
-- itself instead of applying it in a layer above. That way we cut the output
-- at the source rather than generate and then discard it later. We can do this
-- by passing a fold to filter the input.
--
-- Symlink Resolution:
-- -------------------
--
-- When reading a symlink directory we can resolve the symlink and read the
-- destination directory or we can just emit the file it is pointing to and
-- the read can happen next at the higher level, in the traversal logic
-- (concatIterate). Not sure if one approach has any significant perf impact
-- over the other. Similar thinking applies to a mount point as well. Also, if
-- we resolve the symlinks in concatIterate, then each resolution will be
-- counted as depth level increment whereas if we resolve that at lower level
-- then it won't. We can do this by passing an option to modify the behavior.
--
-- Cyclic paths:
-- -------------
--
-- When resolving cyclic directory symlinks one way to curtail it is ELOOP
-- which gives up if it encounters too many level. Another way is to use
-- the inode information to check if we are traversing an already traversed
-- inode, this is in general helpful in a graph traversal. We can ignore
-- ELOOP by passing an option but it may be inefficient because we may
-- encounter the loop from any node in the cycle.
--
-- Broken links and permission issues:
-- -----------------------------------
--
-- If we encounter an error reading a directory because of permission
-- issues should we ignore it in this low level API or catch it in the
-- higher level traversal functionality? Similarly, if there are broken
-- symlinks, where to handle the error? Need to check performance when
-- handling it in ListDir. Suppressing the error at the lower level may be
-- more efficient than propagating it up and then handling it there. We can
-- do this by passing an option.
--
-- Returning the metadata:
-- -----------------------
--
-- Specific scans can be used to return the metadata in the output stream if
-- needed. However, we may need three different APIs: without metadata, with
-- fast metadata, and with full metadata. In each case the fold input would
-- be different.
--
-- * readMinimal: read only the path names, no metadata
-- * readStandard: read the path and minimal metadata
-- * readFull: read full metadata
--
-- NOTE: Full metadata can be read by mapping a stat call to a stream of paths
-- rather than via readdir API. Does it help the performance to do it in the
-- readdir API?
--
-- Passing a scan to the readdir operation:
-- ----------------------------------------
--
-- There are two approaches to traversal and filtering. (1) Read the attributes
-- as data and provide it to a high level traversal handler to do filtering etc.
-- (2) pass a fold or scan to the reader itself which does the filtering.
--
-- By passing a scan we can process the output right at the source and produce
-- a cooked output. Otherwise we may have to produce a stream of intermediate
-- structures which may have more per item overhead and that overhead may not
-- get eliminated by fusion. For example, a fold can directly write the CString
-- from readdir to the output buffer whereas if we output the Path then we will
-- incur an overhead of intermediate structure.
--
-- Filesystem functionality modules:
-- ---------------------------------
--
-- This DirIO module mainly provides "readdir" functionality. File stat
-- functionality is coupled to readdir because we may return file stats along
-- with filepaths, or may provide functionality to filter based on stats.
--
-- The FileTest module in streamly-coreutils provides the file stat read
-- operations, we may have to bring that here if some coupling with readdir is
-- needed.
--
-- FileIO module provides regular file create operation.
--
-- See docs/Developer/FileSystem.DirIO.md for design notes.

module Streamly.Internal.FileSystem.DirIO
(
Expand All @@ -128,6 +32,7 @@ module Streamly.Internal.FileSystem.DirIO
, readEither
, readEitherPaths
, OS.readEitherChunks
, OS.readEitherFold
, OS.readEitherByteChunks
, readEitherChunksPortable

Expand Down
Loading
Loading