Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parquet file created by .sink_parquet got Invalid thrift: protocol error #16262

Open
2 tasks done
dui1234 opened this issue May 16, 2024 · 0 comments
Open
2 tasks done
Labels
bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars

Comments

@dui1234
Copy link

dui1234 commented May 16, 2024

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

valid_trxn = (pl.scan_parquet(data_path)
              .filter(pl.col("block_number").is_between(2912406,9354105))
              .filter((pl.col("success") == True) & (pl.col("to_address").is_not_null()))
              .sink_parquet(f"valid_trxn_160wk.parquet", compression='lz4')
             )

regular_sender = (pl.scan_parquet(data_path)
                  .filter(pl.col("block_number").is_between(2912406,9354105))
                  .filter((pl.col("success") == True) & (pl.col("to_address").is_not_null()))
                  .groupby(pl.col("from_address"))
                  .agg(pl.count())
                  .filter(pl.col('count') > 1)
                  .sink_parquet(f"reg_sender_160wk.parquet", compression='lz4')
       )

regular_receiver = (pl.scan_parquet(data_path)
                  .filter(pl.col("block_number").is_between(2912406,9354105))
                  .filter((pl.col("success") == True) & (pl.col("to_address").is_not_null()))
                  .groupby(pl.col("to_address"))
                  .agg(pl.count())
                  .filter(pl.col('count') > 1)
                  .sink_parquet(f"reg_receiver_160wk.parquet", compression='lz4')
       )

considered_trxn = (pl.scan_parquet("valid_trxn_160wk.parquet")
              .join(pl.scan_parquet("reg_sender_160wk.parquet"), left_on="from_address", right_on="from_address", how="inner")
              .join(pl.scan_parquet("reg_receiver_160wk.parquet"), left_on="to_address", right_on="to_address", how="inner")
              .sink_parquet(f"considered_trxn_160wk.parquet", compression='lz4')
)

Log output

No response

Issue description

I am trying to get a join parquet file where the column "from_address" and "to_address" of "valid_trxn_160wk.parquet" match with the respective column in ""reg_sender_160wk.parquet" and ""reg_sender_160wk.parquet", repectively. However, when I try to access the result file, "considered_trxn_160wk.parquet", it shows me "Invalid thrift: protocol error", I tried to check whether all three initial files are corrupted by trying to read it and it works. Also, accessing the three files and make query with

considered_trxn = (pl.scan_parquet("valid_trxn_160wk.parquet")
              .join(pl.scan_parquet("reg_sender_160wk.parquet"), left_on="from_address", right_on="from_address", how="inner")
              .join(pl.scan_parquet("reg_receiver_160wk.parquet"), left_on="to_address", right_on="to_address", how="inner")
              .collect(streaming=True)
)

works perfectly find. Except the memory usage is huge and almost consume all of my memory capacity. That's why I sink it.
So, I am not sure what happended here.

Error


ComputeError Traceback (most recent call last)
Cell In[4], line 1
----> 1 considered_trxn_r = (pl.scan_parquet("considered_trxn_160wk.parquet")
2 .with_columns(('0x'+ pl.col("transaction_hash").bin.encode("hex")).alias("encoded_th"))
3 .with_columns(('0x'+ pl.col("from_address").bin.encode("hex")).alias("from"))
4 .with_columns(('0x'+ pl.col("to_address").bin.encode("hex")).alias("to"))
5 .select(pl.col("block_number","encoded_th", "value_f64", "gas_used", "from", "to",
6 "n_input_bytes", "n_input_zero_bytes", "n_input_nonzero_bytes"))
7 .collect(streaming=True)
8 )

File ~/jax_env/lib/python3.10/site-packages/polars/_utils/deprecation.py:134, in deprecate_renamed_parameter..decorate..wrapper(*args, **kwargs)
129 @wraps(function)
130 def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
131 _rename_keyword_argument(
132 old_name, new_name, kwargs, function.name, version
133 )
--> 134 return function(*args, **kwargs)

File ~/jax_env/lib/python3.10/site-packages/polars/_utils/deprecation.py:134, in deprecate_renamed_parameter..decorate..wrapper(*args, **kwargs)
129 @wraps(function)
130 def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
131 _rename_keyword_argument(
132 old_name, new_name, kwargs, function.name, version
133 )
--> 134 return function(*args, **kwargs)

File ~/jax_env/lib/python3.10/site-packages/polars/io/parquet/functions.py:394, in scan_parquet(source, n_rows, row_index_name, row_index_offset, parallel, use_statistics, hive_partitioning, hive_schema, rechunk, low_memory, cache, storage_options, retries)
391 else:
392 source = [normalize_filepath(source) for source in source]
--> 394 return _scan_parquet_impl(
395 source,
396 n_rows=n_rows,
397 cache=cache,
398 parallel=parallel,
399 rechunk=rechunk,
400 row_index_name=row_index_name,
401 row_index_offset=row_index_offset,
402 storage_options=storage_options,
403 low_memory=low_memory,
404 use_statistics=use_statistics,
405 hive_partitioning=hive_partitioning,
406 hive_schema=hive_schema,
407 retries=retries,
408 )

File ~/jax_env/lib/python3.10/site-packages/polars/io/parquet/functions.py:454, in _scan_parquet_impl(source, n_rows, cache, parallel, rechunk, row_index_name, row_index_offset, storage_options, low_memory, use_statistics, hive_partitioning, hive_schema, retries)
450 else:
451 # Handle empty dict input
452 storage_options = None
--> 454 pylf = PyLazyFrame.new_from_parquet(
455 source,
456 sources,
457 n_rows,
458 cache,
459 parallel,
460 rechunk,
461 parse_row_index_args(row_index_name, row_index_offset),
462 low_memory,
463 cloud_options=storage_options,
464 use_statistics=use_statistics,
465 hive_partitioning=hive_partitioning,
466 hive_schema=hive_schema,
467 retries=retries,
468 )
469 return wrap_ldf(pylf)

ComputeError: parquet: File out of specification: Invalid thrift: protocol error

Expected behavior

The "considered_trxn_160wk.parquet" can be read and further processed with pl.scan_parquet or pl.read_parquet

Installed versions

--------Version info---------
Polars:               0.20.20
Index type:           UInt32
Platform:             Linux-6.5.0-28-generic-x86_64-with-glibc2.35
Python:               3.10.12 (main, Nov 20 2023, 15:14:05) [GCC 11.4.0]

----Optional dependencies----
adbc_driver_manager:  <not installed>
cloudpickle:          <not installed>
connectorx:           <not installed>
deltalake:            <not installed>
fastexcel:            <not installed>
fsspec:               <not installed>
gevent:               <not installed>
hvplot:               <not installed>
matplotlib:           3.8.4
nest_asyncio:         1.6.0
numpy:                1.23.5
openpyxl:             <not installed>
pandas:               1.4.4
pyarrow:              15.0.2
pydantic:             <not installed>
pyiceberg:            <not installed>
pyxlsb:               <not installed>
sqlalchemy:           2.0.29
xlsx2csv:             <not installed>
xlsxwriter:           <not installed>
@dui1234 dui1234 added bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars labels May 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars
Projects
None yet
Development

No branches or pull requests

1 participant