Skip to content
/ php-ksql Public

Apache Kafka / Confluent KSQL REST Client for PHP

License

Notifications You must be signed in to change notification settings

ytake/php-ksql

Repository files navigation

Ytake\KsqlClient [ytake/php-ksql]

Apache kafka / Confluent KSQL REST Client for php

Build Status Coverage Status Scrutinizer Code Quality StyleCI

License Latest Version Total Downloads

What is KSQL

KSQL is the streaming SQL engine for Apache Kafka.

What Is KSQL?

Install

required >= PHP 7.1

$ composer require ytake/php-ksql

Usage

Request Preset

class
Ytake\KsqlClient\Query\CommandStatus
Ytake\KsqlClient\Query\Status
Ytake\KsqlClient\Query\ServerInfo
Ytake\KsqlClient\Query\Ksql
Ytake\KsqlClient\Query\Stream (for stream)

Syntax Reference

Get Command Status

<?php

use Ytake\KsqlClient\RestClient;
use Ytake\KsqlClient\Query\CommandStatus;
use Ytake\KsqlClient\Computation\CommandId;

$client = new RestClient(
    "http://localhost:8088"
);
$result = $client->requestQuery(
    new CommandStatus(CommandId::fromString('stream/MESSAGE_STREAM/create'))
)->result();

Get Statuses

<?php

use Ytake\KsqlClient\RestClient;
use Ytake\KsqlClient\Query\Status;

$client = new RestClient(
    "http://localhost:8088"
);
$result = $client->requestQuery(new Status())->result();

Get KSQL Server Information

<?php

use Ytake\KsqlClient\RestClient;
use Ytake\KsqlClient\Query\ServerInfo;

$client = new RestClient(
    "http://localhost:8088"
);
$result = $client->requestQuery(new ServerInfo())->result();

Query KSQL

<?php

use Ytake\KsqlClient\RestClient;
use Ytake\KsqlClient\Query\Ksql;

$client = new RestClient(
    "http://localhost:8088"
);
$result = $client->requestQuery(
    new Ksql('DESCRIBE users_original;')
)->result();

Client for Stream Response

<?php

use Ytake\KsqlClient\StreamClient;
use Ytake\KsqlClient\Query\Stream;
use Ytake\KsqlClient\StreamConsumable;
use Ytake\KsqlClient\Entity\StreamedRow;

$client = new StreamClient(
    "http://localhost:8088"
);
$result = $client->requestQuery(
    new Stream(
        'SELECT * FROM testing',
        new class() implements StreamConsumable {
            public function __invoke(StreamedRow $row) 
            {
                // stream response consumer
            }
        }    
    )
)->result();

About

Apache Kafka / Confluent KSQL REST Client for PHP

Topics

Resources

License

Stars

Watchers

Forks

Sponsor this project

 

Packages

No packages published

Languages