Skip to content

Commit

Permalink
Add input and output in scaling config; Refactor JobRateLimitAlgorith…
Browse files Browse the repository at this point in the history
…m SPI; Add rateLimiter TPS impl (apache#14621)
  • Loading branch information
sandynz committed Jan 8, 2022
1 parent 791f780 commit b4d9801
Show file tree
Hide file tree
Showing 25 changed files with 446 additions and 144 deletions.
10 changes: 0 additions & 10 deletions docs/document/content/dev-manual/scaling.cn.md
Expand Up @@ -17,16 +17,6 @@ chapter = true
| PostgreSQLScalingEntry | 基于 PostgreSQL 的弹性伸缩入口 |
| OpenGaussScalingEntry | 基于 openGauss 的弹性伸缩入口 |

## JobRateLimitAlgorithm

| *SPI 名称* | *详细说明* |
| ------------------------------------------- | ------------------------------------------- |
| JobRateLimitAlgorithm | 任务限流算法 |

| *已知实现类* | *详细说明* |
| ------------------------------------------- | ------------------------------------------- |
| SourceJobRateLimitAlgorithm | 源端限流算法 |

## JobCompletionDetectAlgorithm

| *SPI 名称* | *详细说明* |
Expand Down
10 changes: 0 additions & 10 deletions docs/document/content/dev-manual/scaling.en.md
Expand Up @@ -17,16 +17,6 @@ chapter = true
| PostgreSQLScalingEntry | PostgreSQL entry of scaling |
| OpenGaussScalingEntry | openGauss entry of scaling |

## JobRateLimitAlgorithm

| *SPI Name* | *Description* |
| ------------------------------------------- | ------------------------------------------- |
| JobRateLimitAlgorithm | job rate limit algorithm |

| *Implementation Class* | *Description* |
| ------------------------------------------- | ------------------------------------------- |
| SourceJobRateLimitAlgorithm | rate limit algorithm for source side |

## JobCompletionDetectAlgorithm

| *SPI Name* | *Description* |
Expand Down
Expand Up @@ -54,12 +54,20 @@ rules:
scaling:
<scaling-action-config-name> (+):
blockQueueSize: # 数据通道阻塞队列大小
workerThread: # 给全量数据摄取和数据导入使用的工作线程池大小
readBatchSize: # 一次查询操作返回的最大记录数
rateLimiter: # 限流算法
type: # 算法类型。可选项:SOURCE
props: # 算法属性
qps: # QPS属性。适用算法类型:SOURCE
input:
workerThread: # 从源端摄取全量数据的线程池大小
batchSize: # 一次查询操作返回的最大记录数
rateLimiter: # 限流算法
type: # 算法类型。可选项:QPS
props: # 算法属性
qps: # qps属性。适用算法类型:QPS
output:
workerThread: # 数据导入到目标端的线程池大小
batchSize: # 一次批量写入操作的最大记录数
rateLimiter: # 限流算法
type: # 算法类型。可选项:TPS
props: # 算法属性
tps: # tps属性。适用算法类型:TPS
completionDetector: # 作业是否接近完成检测算法。如果不配置,那么系统无法自动进行后续步骤,可以通过 DistSQL 手动操作。
type: # 算法类型。可选项:IDLE
props: # 算法属性
Expand All @@ -80,12 +88,20 @@ rules:
scaling:
default_scaling:
blockQueueSize: 10000
workerThread: 40
readBatchSize: 1000
rateLimiter:
type: SOURCE
props:
qps: 50
input:
workerThread: 40
batchSize: 1000
rateLimiter:
type: QPS
props:
qps: 50
output:
workerThread: 40
batchSize: 1000
rateLimiter:
type: TPS
props:
tps: 2000
completionDetector:
type: IDLE
props:
Expand Down
Expand Up @@ -53,12 +53,20 @@ rules:
scaling:
<scaling-action-config-name> (+):
blockQueueSize: # Data channel blocking queue size
workerThread: # Worker thread pool size for inventory data ingestion and data importing
readBatchSize: # Maximum records count of a query operation returning
rateLimiter: # Rate limit algorithm
type: # Algorithm type. Options: SOURCE
props: # Algorithm properties
qps: # QPS property. Available for types: SOURCE
input:
workerThread: # Worker thread pool size for inventory data ingestion from source
batchSize: # Maximum records count of a DML select operation
rateLimiter: # Rate limit algorithm
type: # Algorithm type. Options: QPS
props: # Algorithm properties
qps: # QPS property. Available for types: QPS
output:
workerThread: # Worker thread pool size for data importing to target
batchSize: # Maximum records count of a DML insert/delete/update operation
rateLimiter: # Rate limit algorithm
type: # Algorithm type. Options: TPS
props: # Algorithm properties
tps: # TPS property. Available for types: TPS
completionDetector: # Completion detect algorithm. If it's not configured, then system won't continue to do next steps automatically.
type: # Algorithm type. Options: IDLE
props: # Algorithm properties
Expand All @@ -79,12 +87,20 @@ rules:
scaling:
default_scaling:
blockQueueSize: 10000
workerThread: 40
readBatchSize: 1000
rateLimiter:
type: SOURCE
props:
qps: 50
input:
workerThread: 40
batchSize: 1000
rateLimiter:
type: QPS
props:
qps: 50
output:
workerThread: 40
batchSize: 1000
rateLimiter:
type: TPS
props:
tps: 2000
completionDetector:
type: IDLE
props:
Expand Down
Expand Up @@ -19,12 +19,20 @@ dataConverterName: default_convert
dataConverters:
default_convert:
blockQueueSize: 10000
workerThread: 40
readBatchSize: 1000
rateLimiter:
type: SOURCE
props:
qps: 50
input:
workerThread: 40
batchSize: 1000
rateLimiter:
type: QPS
props:
qps: 50
output:
workerThread: 40
batchSize: 1000
rateLimiter:
type: TPS
props:
tps: 2000
completionDetector:
type: IDLE
props:
Expand Down
Expand Up @@ -109,12 +109,20 @@ rules:
scaling:
default_scaling:
blockQueueSize: 10000
workerThread: 40
readBatchSize: 1000
rateLimiter:
type: SOURCE
props:
qps: 50
input:
workerThread: 40
batchSize: 1000
rateLimiter:
type: QPS
props:
qps: 50
output:
workerThread: 40
batchSize: 1000
rateLimiter:
type: TPS
props:
tps: 2000
completionDetector:
type: IDLE
props:
Expand Down
Expand Up @@ -19,12 +19,20 @@ scalingName: default_scaling
scaling:
default_scaling:
blockQueueSize: 10000
workerThread: 40
readBatchSize: 1000
rateLimiter:
type: SOURCE
props:
qps: 50
input:
workerThread: 40
batchSize: 1000
rateLimiter:
type: QPS
props:
qps: 50
output:
workerThread: 40
batchSize: 1000
rateLimiter:
type: TPS
props:
tps: 2000
completionDetector:
type: IDLE
props:
Expand Down
Expand Up @@ -19,24 +19,48 @@

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;

/**
* On rule altered action configuration.
*/
@RequiredArgsConstructor
@Getter
@ToString
public final class OnRuleAlteredActionConfiguration {

private final int blockQueueSize;

private final int workerThread;
private final InputConfiguration input;

private final int readBatchSize;

private final ShardingSphereAlgorithmConfiguration rateLimiter;
private final OutputConfiguration output;

private final ShardingSphereAlgorithmConfiguration completionDetector;

private final ShardingSphereAlgorithmConfiguration dataConsistencyChecker;

@RequiredArgsConstructor
@Getter
@ToString
public static final class InputConfiguration {

private final int workerThread;

private final int batchSize;

private final ShardingSphereAlgorithmConfiguration rateLimiter;
}

@RequiredArgsConstructor
@Getter
@ToString
public static final class OutputConfiguration {

private final int workerThread;

private final int batchSize;

private final ShardingSphereAlgorithmConfiguration rateLimiter;
}
}
Expand Up @@ -17,8 +17,10 @@

package org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered;

import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlConfiguration;
import org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlShardingSphereAlgorithmConfiguration;

Expand All @@ -27,17 +29,36 @@
*/
@Getter
@Setter
@ToString
public final class YamlOnRuleAlteredActionConfiguration implements YamlConfiguration {

private int blockQueueSize = 10000;

private int workerThread = 40;
private YamlInputConfiguration input;

private int readBatchSize = 1000;

private YamlShardingSphereAlgorithmConfiguration rateLimiter;
private YamlOutputConfiguration output;

private YamlShardingSphereAlgorithmConfiguration completionDetector;

private YamlShardingSphereAlgorithmConfiguration dataConsistencyChecker;

@Data
public static final class YamlInputConfiguration implements YamlConfiguration {

private int workerThread = 40;

private int batchSize = 1000;

private YamlShardingSphereAlgorithmConfiguration rateLimiter;
}

@Data
public static final class YamlOutputConfiguration implements YamlConfiguration {

private int workerThread = 40;

private int batchSize = 1000;

private YamlShardingSphereAlgorithmConfiguration rateLimiter;
}
}

0 comments on commit b4d9801

Please sign in to comment.