Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize rows_from_chunks #262

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

mauguignard
Copy link

My team and I are considering moving into Druid. As part of our tests, we were benchmarking some big queries and we found out that the fetch of the rows was taking longer than expected. After some profiling, we traced back the bottleneck to the loop in rows_from_chunks.

The new version introduced in this PR reduced the wall time of this function from 28.70s to only 4.93s (providing an effective speedup of ~5.8x) when running the following benchmark on my laptop (Ubuntu 20.04 - CPython 3.7.10 - Intel i5-8250U):

import argparse
import datetime
import json
import logging
import timeit
from collections import OrderedDict
from random import Random

from pydruid.db.api import rows_from_chunks


STRING_CHOICES = [
    "alice",
    "bob",
    r"ali\"ce",
    "ali{ce",
    r"b\ob",
    "{bob}",
    "{1: 2}",
    r"{\"id\": 1}",
]


def generate_rows():
    seed = 123
    rnd = Random(seed)

    rows = []
    base_timestamp = datetime.datetime(2021, 1, 1).timestamp()

    for i in range(500_000):
        row = OrderedDict(
            [
                (
                    "__time",
                    datetime.datetime.fromtimestamp(
                        rnd.random() * base_timestamp
                    ).isoformat(),
                ),
                ("dimension0", rnd.choice(STRING_CHOICES)),
                ("dimension1", rnd.choice(STRING_CHOICES)),
                ("dimension2", rnd.choice(STRING_CHOICES)),
                ("dimension3", rnd.choice(STRING_CHOICES)),
                ("dimension4", rnd.choice(STRING_CHOICES)),
                ("counter0", rnd.randrange(10_000_000)),
                ("counter1", rnd.randrange(10_000_000)),
                ("counter2", rnd.randrange(10_000_000)),
                ("counter3", rnd.randrange(10_000_000)),
                ("counter4", rnd.randrange(10_000_000)),
                ("counter5", rnd.randrange(10_000_000)),
                ("counter6", rnd.randrange(10_000_000)),
                ("counter7", rnd.randrange(10_000_000)),
                ("counter8", rnd.randrange(10_000_000)),
                ("counter9", rnd.randrange(10_000_000)),
                ("counter0f", rnd.randrange(10_000_000) / 10_000.0),
                ("counter1f", rnd.randrange(10_000_000) / 10_000.0),
                ("counter2f", rnd.randrange(10_000_000) / 10_000.0),
                ("counter3f", rnd.randrange(10_000_000) / 10_000.0),
                ("counter4f", rnd.randrange(10_000_000) / 10_000.0),
                ("counter5f", rnd.randrange(10_000_000) / 10_000.0),
                ("counter6f", rnd.randrange(10_000_000) / 10_000.0),
                ("counter7f", rnd.randrange(10_000_000) / 10_000.0),
                ("counter8f", rnd.randrange(10_000_000) / 10_000.0),
                ("counter9f", rnd.randrange(10_000_000) / 10_000.0),
            ]
        )
        rows.append(row)

    return rows


def generate_chunks(rows, chunk_size=8192):
    body = json.dumps(rows, separators=(",", ":"))
    return [body[i: i + chunk_size] for i in range(0, len(body), chunk_size)]


def verify():
    logging.info("Generating data...")
    rows = generate_rows()

    logging.info("Generating chunks...")
    chunks = generate_chunks(rows)

    logging.info("Parsing chunks...")
    parsed_rows = list(rows_from_chunks(chunks))

    logging.info("Verifying results...")

    assert len(rows) == len(parsed_rows), "The number of rows is not the expected"

    for row, parsed_row in zip(rows, parsed_rows):
        assert tuple(row.items()) == tuple(
            parsed_row.items()
        ), "Rows differ. %r != %r" % (row, parsed_row)


def main(options):
    if options.verify:
        verify()

    logging.info("Starting benchmark...")
    timer = timeit.Timer(
        setup="chunks = generate_chunks(generate_rows())",
        stmt="list(rows_from_chunks(chunks))",
        globals=globals(),
    )

    timings = timer.repeat(repeat=options.repeat, number=1)
    logging.info("Best timing = %f", min(timings))


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Benchmark rows_from_chunks.")
    parser.add_argument("--verify", dest="verify", action="store_true")
    parser.add_argument("--no-verify", dest="verify", action="store_false")
    parser.set_defaults(verify=True)

    parser.add_argument("--repeat", "-r", type=int, default=20)

    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - [%(levelname)s] - %(name)s:%(module)s:%(funcName)s: %(message)s",
    )
    main(parser.parse_args())

In our tests, however, we had an actual speedup closer to 9x when fetching the rows from one of our test queries.

In Python 3.6 and lower, we cannot avoid the overhead of calling OrderedDict. In our tests, though, the speedup is still close to 4x.

As a side effect, this PR should also solve #242 , since the parsing is now completely delegated to the official json module.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant