Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* Add `DataFrame.Typed.Lazy` module — a type-safe lazy query pipeline combining compile-time schema tracking with deferred execution.
* Add `fromCsv` function for parsing a CSV string directly into a DataFrame.
* Add `DataKinds` extension and `DataFrame.Typed` import to the GHCi file for easier interactive typed dataframe workflows.
* Add BROTLI page decompression for Parquet reads and document the currently supported Parquet compression codecs.

### Performance
* Specialize and inline aggregation functions (`sum`, `mean`, `variance`, `median`, `stddev`, etc.) to avoid expensive numeric conversions at runtime.
Expand Down
4 changes: 4 additions & 0 deletions dataframe.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ library
DataFrame.Typed.Expr,
DataFrame.Typed.Lazy,
DataFrame.Typed
other-modules: DataFrame.IO.Parquet.Brotli
build-depends: base >= 4 && <5,
deepseq >= 1 && < 2,
aeson >= 0.11.0.0 && < 3,
Expand Down Expand Up @@ -143,6 +144,8 @@ library
http-conduit >= 2.3 && < 3,
streamly-core >= 0.2.3 && < 0.4,
streamly-bytestring >= 0.2.0 && < 0.4
if !os(windows)
build-depends: unix >= 2 && < 3
hs-source-dirs: src
default-language: Haskell2010

Expand Down Expand Up @@ -267,6 +270,7 @@ test-suite tests
Operations.Typing,
LazyParquet,
Parquet,
ParquetTestHelpers,
ParquetTestData,
Properties,
Monad
Expand Down
3 changes: 3 additions & 0 deletions examples/examples.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ executable examples
DataFrame.IO.JSON,
DataFrame.IO.Parquet,
DataFrame.IO.Parquet.Binary,
DataFrame.IO.Parquet.Brotli,
DataFrame.IO.Parquet.Dictionary,
DataFrame.IO.Parquet.Levels,
DataFrame.IO.Parquet.Thrift,
Expand Down Expand Up @@ -133,6 +134,8 @@ executable examples
stm >= 2.5 && < 3,
filepath >= 1.4 && < 2,
Glob >= 0.10 && < 1,
if !os(windows)
build-depends: unix >= 2 && < 3
if impl(ghc >= 9.12)
build-depends: ghc-typelits-natnormalise == 0.9.3
else
Expand Down
5 changes: 5 additions & 0 deletions src/DataFrame/IO/Parquet.hs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ defaultParquetReadOptions =
@
ghci> D.readParquet ".\/data\/mtcars.parquet"
@

Current page decompression supports Parquet files using
@UNCOMPRESSED@, @SNAPPY@, @GZIP@, @ZSTD@, and @BROTLI@ codecs.
@LZ4@ and @LZ4_RAW@ pages still fail with an unsupported compression error.
BROTLI pages require the @libbrotlidec@ shared library to be available at runtime.
-}
readParquet :: FilePath -> IO DataFrame
readParquet = readParquetWithOpts defaultParquetReadOptions
Expand Down
146 changes: 146 additions & 0 deletions src/DataFrame/IO/Parquet/Brotli.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
{-# LANGUAGE CPP #-}

module DataFrame.IO.Parquet.Brotli (decompress) where

import qualified Data.ByteString as BS

#ifdef mingw32_HOST_OS
decompress :: Int -> BS.ByteString -> IO BS.ByteString
decompress _ _ =
error
"BROTLI decompression requires libbrotlidec and is not supported on Windows in this build"
#else
import Control.Exception (SomeException, try)
import qualified Data.ByteString.Internal as BSI
import qualified Data.ByteString.Unsafe as BSU
import Data.List (intercalate)
import Data.Word (Word8)
import Foreign.C.Types (CInt (..), CSize (..))
import Foreign.ForeignPtr (withForeignPtr)
import Foreign.Marshal.Alloc (alloca, allocaBytes)
import Foreign.Ptr (FunPtr, Ptr, castPtr)
import Foreign.Storable (peek, poke)
import System.IO.Unsafe (unsafePerformIO)
import System.Posix.DynamicLinker (DL, RTLDFlags (RTLD_NOW), dlclose, dlopen, dlsym)

data BrotliDecoder = BrotliDecoder
{ _decoderHandle :: DL
, decoderDecompress :: BrotliDecoderDecompressFn
}

type BrotliDecoderDecompressFn =
CSize -> Ptr Word8 -> Ptr CSize -> Ptr Word8 -> IO CInt

foreign import ccall unsafe "dynamic"
mkBrotliDecoderDecompressFn ::
FunPtr BrotliDecoderDecompressFn -> BrotliDecoderDecompressFn

brotliDecoder :: Either String BrotliDecoder
brotliDecoder = unsafePerformIO loadBrotliDecoder
{-# NOINLINE brotliDecoder #-}

brotliLibraryCandidates :: [FilePath]
brotliLibraryCandidates =
[ "libbrotlidec.so.1"
, "libbrotlidec.so"
, "libbrotlidec.dylib"
, "/opt/homebrew/opt/brotli/lib/libbrotlidec.1.dylib"
, "/opt/homebrew/lib/libbrotlidec.dylib"
, "/usr/local/lib/libbrotlidec.dylib"
]

loadBrotliDecoder :: IO (Either String BrotliDecoder)
loadBrotliDecoder = go brotliLibraryCandidates []
where
go [] errorsSeen =
pure $
Left $
unlines
[ "Unable to load libbrotlidec for Parquet BROTLI decoding."
, "Tried: " ++ intercalate ", " brotliLibraryCandidates
, "Errors:"
, unlines (map (" " ++) (reverse errorsSeen))
]
go (candidate : rest) errorsSeen = do
opened <- try (dlopen candidate [RTLD_NOW]) :: IO (Either SomeException DL)
case opened of
Left err ->
go rest (formatError candidate err : errorsSeen)
Right handle -> do
symbolResult <-
try (dlsym handle "BrotliDecoderDecompress") ::
IO (Either SomeException (FunPtr BrotliDecoderDecompressFn))
case symbolResult of
Left err -> do
dlclose handle
go rest (formatError candidate err : errorsSeen)
Right fnPtr ->
pure $
Right $
BrotliDecoder
{ _decoderHandle = handle
, decoderDecompress = mkBrotliDecoderDecompressFn fnPtr
}

formatError candidate err = candidate ++ ": " ++ show err

brotliDecoderSuccess :: CInt
brotliDecoderSuccess = 1

decompress :: Int -> BS.ByteString -> IO BS.ByteString
decompress expectedSize compressed
| expectedSize < 0 =
error ("BROTLI decompression requires a non-negative size, got " ++ show expectedSize)
| otherwise =
case brotliDecoder of
Left err -> error err
Right decoder ->
BSU.unsafeUseAsCStringLen compressed $ \(inputPtr, inputLen) ->
withOutputBuffer expectedSize $ \outputPtr -> do
actualSize <-
runDecoder
decoder
(fromIntegral inputLen)
(castPtr inputPtr)
outputPtr
expectedSize
validateDecodedSize expectedSize actualSize

withOutputBuffer :: Int -> (Ptr Word8 -> IO ()) -> IO BS.ByteString
withOutputBuffer expectedSize useOutputPtr
| expectedSize == 0 =
allocaBytes 1 $ \outputPtr -> do
useOutputPtr outputPtr
pure BS.empty
| otherwise = do
fp <- BSI.mallocByteString expectedSize
withForeignPtr fp useOutputPtr
pure (BSI.fromForeignPtr fp 0 expectedSize)

runDecoder :: BrotliDecoder -> CSize -> Ptr Word8 -> Ptr Word8 -> Int -> IO Int
runDecoder decoder inputLen inputPtr outputPtr expectedSize =
alloca $ \outputSizePtr -> do
poke outputSizePtr (fromIntegral expectedSize)
result <-
decoderDecompress decoder inputLen inputPtr outputSizePtr outputPtr
validateDecoderResult result
fromIntegral <$> peek outputSizePtr

validateDecoderResult :: CInt -> IO ()
validateDecoderResult result
| result == brotliDecoderSuccess = pure ()
| otherwise =
error
("BROTLI decompression failed with result code " ++ show result)

validateDecodedSize :: Int -> Int -> IO ()
validateDecodedSize expectedSize actualSize
| actualSize == expectedSize = pure ()
| otherwise =
error
( "BROTLI decompressed size mismatch: expected "
++ show expectedSize
++ " bytes, got "
++ show actualSize
)
#endif
2 changes: 2 additions & 0 deletions src/DataFrame/IO/Parquet/Page.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import Data.Int
import Data.Maybe (fromMaybe)
import qualified Data.Vector.Unboxed as VU
import DataFrame.IO.Parquet.Binary
import qualified DataFrame.IO.Parquet.Brotli as Brotli
import DataFrame.IO.Parquet.Thrift
import DataFrame.IO.Parquet.Types
import DataFrame.Internal.Binary (
Expand Down Expand Up @@ -61,6 +62,7 @@ readPage c columnBytes =
Right res -> pure res
UNCOMPRESSED -> pure compressed
GZIP -> pure (LB.toStrict (GZip.decompress (BS.fromStrict compressed)))
BROTLI -> Brotli.decompress (fromIntegral (uncompressedPageSize hdr)) compressed
other -> error ("Unsupported compression type: " ++ show other)
pure
( Just $ Page hdr fullData
Expand Down
52 changes: 46 additions & 6 deletions tests/Parquet.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,19 @@ import qualified DataFrame as D
import qualified DataFrame.Functions as F
import qualified DataFrame.IO.Parquet as DP
import ParquetTestData (allTypes, mtCarsDataset, tinyPagesLast10, transactions)
import ParquetTestHelpers (
assertFirstColumnCodec,
buildDataPageV1,
encodePlainInt32Payload,
)

import qualified Data.ByteString as BS
import Data.Int
import qualified Data.Set as S
import qualified Data.Text as T
import Data.Word
import qualified Data.Vector.Unboxed as VU
import Data.Word (Word32, Word64)
import DataFrame.IO.Parquet.Page (readNInt32Vec, readPage)
import DataFrame.IO.Parquet.Thrift (
columnMetaData,
columnPathInSchema,
Expand All @@ -23,7 +30,14 @@ import DataFrame.IO.Parquet.Thrift (
rowGroups,
schema,
)
import DataFrame.IO.Parquet.Types (columnNullCount)
import DataFrame.IO.Parquet.Types (
CompressionCodec (BROTLI),
Page (pageBytes, pageHeader),
PageHeader (pageHeaderPageType, pageTypeHeader),
PageType (DATA_PAGE),
PageTypeHeader (DataPageHeader, dataPageHeaderNumValues),
columnNullCount,
)
import DataFrame.Internal.Binary (
littleEndianWord32,
littleEndianWord64,
Expand Down Expand Up @@ -427,12 +441,37 @@ concatenatedGzipMembers =
largeBrotliMap :: Test
largeBrotliMap =
TestCase
( assertExpectException
"largeBrotliMap"
"BROTLI"
(D.readParquet "./tests/data/large_string_map.brotli.parquet")
( assertFirstColumnCodec
"largeBrotliMap codec"
BROTLI
"./tests/data/large_string_map.brotli.parquet"
)

brotliPageReader :: Test
brotliPageReader =
TestCase $ do
let expectedPayload = encodePlainInt32Payload [1, 2, 3]
let compressedPayload = BS.pack [31, 11, 0, 248, 167, 1, 2, 4, 6, 86, 10, 162, 4, 0, 194, 30]
let encodedPage = buildDataPageV1 3 expectedPayload compressedPayload
(maybePage, remainder) <- readPage BROTLI encodedPage
assertEqual "brotliPageReader remainder" BS.empty remainder
case maybePage of
Nothing -> assertFailure "brotliPageReader: expected a decoded page"
Just page -> do
assertEqual
"brotliPageReader page type"
DATA_PAGE
(pageHeaderPageType (pageHeader page))
case pageTypeHeader (pageHeader page) of
DataPageHeader{dataPageHeaderNumValues = numValues} ->
assertEqual "brotliPageReader num values" 3 numValues
other ->
assertFailure ("brotliPageReader: expected DataPageHeader, got " ++ show other)
assertEqual
"brotliPageReader payload"
[1 :: Int32, 2, 3]
(VU.toList (readNInt32Vec 3 (pageBytes page)))

-- ---------------------------------------------------------------------------
-- Group 3: Delta / RLE encodings (unsupported → error tests)
-- ---------------------------------------------------------------------------
Expand Down Expand Up @@ -1118,6 +1157,7 @@ tests =
, lz4RawCompressedLarger
, concatenatedGzipMembers
, largeBrotliMap
, brotliPageReader
, -- Group 3: delta / rle encodings
deltaBinaryPacked
, deltaByteArray
Expand Down
73 changes: 73 additions & 0 deletions tests/ParquetTestHelpers.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
module ParquetTestHelpers (
assertFirstColumnCodec,
buildDataPageV1,
encodePlainInt32Payload,
) where

import Data.Bits (shiftL, shiftR, xor, (.&.), (.|.))
import qualified Data.ByteString as BS
import Data.Int
import Data.Word
import qualified DataFrame.IO.Parquet as DP
import DataFrame.IO.Parquet.Thrift (
columnCodec,
columnMetaData,
compactI32,
compactStruct,
rowGroupColumns,
rowGroups,
)
import DataFrame.IO.Parquet.Types (CompressionCodec)
import DataFrame.Internal.Binary (word32ToLittleEndian)
import Test.HUnit (Assertion, assertEqual, assertFailure)

assertFirstColumnCodec :: String -> CompressionCodec -> FilePath -> Assertion
assertFirstColumnCodec label expected path = do
(metadata, _) <- DP.readMetadataFromPath path
case rowGroups metadata of
[] ->
assertFailure (label ++ ": parquet file has no row groups")
rowGroup : _ -> case rowGroupColumns rowGroup of
[] ->
assertFailure (label ++ ": first row group has no columns")
columnChunk : _ ->
assertEqual label expected (columnCodec (columnMetaData columnChunk))

buildDataPageV1 :: Int32 -> BS.ByteString -> BS.ByteString -> BS.ByteString
buildDataPageV1 numValues payload compressedPayload =
BS.pack
( field 1 compactI32 (zigZag32 0)
++ field 1 compactI32 (zigZag32 (fromIntegral (BS.length payload)))
++ field 1 compactI32 (zigZag32 (fromIntegral (BS.length compressedPayload)))
++ [fieldHeader 2 compactStruct]
++ field 1 compactI32 (zigZag32 numValues)
++ field 1 compactI32 (zigZag32 0)
++ field 1 compactI32 (zigZag32 0)
++ field 1 compactI32 (zigZag32 0)
++ [0x00, 0x00]
)
<> compressedPayload

encodePlainInt32Payload :: [Int32] -> BS.ByteString
encodePlainInt32Payload =
BS.concat
. map (word32ToLittleEndian . fromIntegral)

field :: Word8 -> Word8 -> [Word8] -> [Word8]
field delta encodedType contents = fieldHeader delta encodedType : contents

fieldHeader :: Word8 -> Word8 -> Word8
fieldHeader delta encodedType = (delta `shiftL` 4) .|. encodedType

zigZag32 :: Int32 -> [Word8]
zigZag32 n =
encodeVarInt
( fromIntegral
(((fromIntegral n :: Word32) `shiftL` 1) `xor` fromIntegral (n `shiftR` 31))
)

encodeVarInt :: Word64 -> [Word8]
encodeVarInt n
| n < 0x80 = [fromIntegral n]
| otherwise =
fromIntegral ((n .&. 0x7F) .|. 0x80) : encodeVarInt (n `shiftR` 7)
5 changes: 5 additions & 0 deletions tests/data/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,11 @@ It is meant to exercise reading of structured data where each value
is smaller than 2GB but the combined uncompressed column chunk size
is greater than 2GB.

The repo keeps this fixture to verify BROTLI metadata/plumbing for a
pathological structured column chunk. End-to-end tests should avoid
materializing the full values, since doing so turns the regression into
a memory stress test rather than a codec test.

## Float16 Files

The files `float16_zeros_and_nans.parquet` and `float16_nonzeros_and_nans.parquet`
Expand Down
Loading