Skip to content

Commit

Permalink
Reuse sourceDataSource and sourceMetaDataLoader in scaling job process (
Browse files Browse the repository at this point in the history
#15190)

* Simplify parameters and fields

* Initialize sourceDataSource and sourceMetaDataLoader in job context

* Reuse sourceDataSource and sourceMetaDataLoader in job process

* Unit test
  • Loading branch information
sandynz committed Jan 30, 2022
1 parent ebeb464 commit e915749
Show file tree
Hide file tree
Showing 25 changed files with 228 additions and 117 deletions.
Expand Up @@ -21,7 +21,7 @@
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;

/**
Expand All @@ -32,6 +32,6 @@
public abstract class AbstractIncrementalDumper<P> extends AbstractLifecycleExecutor implements IncrementalDumper {

public AbstractIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<P> position,
final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
}
}
Expand Up @@ -19,9 +19,11 @@

import lombok.AccessLevel;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
Expand All @@ -34,14 +36,14 @@
import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand All @@ -62,29 +64,28 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor

private final JobRateLimitAlgorithm rateLimitAlgorithm;

private final PipelineDataSourceManager dataSourceManager;

private final PipelineTableMetaData tableMetaData;
private final LazyInitializer<PipelineTableMetaData> tableMetaDataLazyInitializer;

private final PipelineChannel channel;

protected AbstractInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
private final DataSource dataSource;

protected AbstractInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineChannel channel,
final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
if (!StandardPipelineDataSourceConfiguration.class.equals(inventoryDumperConfig.getDataSourceConfig().getClass())) {
throw new UnsupportedOperationException("AbstractInventoryDumper only support StandardPipelineDataSourceConfiguration");
}
this.inventoryDumperConfig = inventoryDumperConfig;
this.batchSize = inventoryDumperConfig.getBatchSize();
this.rateLimitAlgorithm = inventoryDumperConfig.getRateLimitAlgorithm();
this.dataSourceManager = dataSourceManager;
tableMetaDataLazyInitializer = new LazyInitializer<PipelineTableMetaData>() {
@Override
protected PipelineTableMetaData initialize() {
return metaDataLoader.getTableMetaData(inventoryDumperConfig.getTableName());
}
};
this.channel = channel;
tableMetaData = createTableMetaData();
}

private PipelineTableMetaData createTableMetaData() {
PipelineDataSourceConfiguration dataSourceConfig = inventoryDumperConfig.getDataSourceConfig();
// TODO share PipelineTableMetaDataLoader
PipelineTableMetaDataLoader metaDataManager = new PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dataSourceConfig));
return metaDataManager.getTableMetaData(inventoryDumperConfig.getTableName());
this.dataSource = dataSource;
}

@Override
Expand All @@ -96,7 +97,7 @@ private void dump() {
String sql = getDumpSQL();
IngestPosition<?> position = inventoryDumperConfig.getPosition();
log.info("inventory dump, sql={}, position={}", sql, position);
try (Connection conn = dataSourceManager.getDataSource(inventoryDumperConfig.getDataSourceConfig()).getConnection()) {
try (Connection conn = dataSource.getConnection()) {
int round = 1;
Number startUniqueKeyValue = getPositionBeginValue(position) - 1;
Optional<Number> maxUniqueKeyValue;
Expand All @@ -123,10 +124,16 @@ private String getDumpSQL() {
return "SELECT * FROM " + tableName + " WHERE " + primaryKey + " > ? AND " + primaryKey + " <= ? ORDER BY " + primaryKey + " ASC LIMIT ?";
}

@SneakyThrows(ConcurrentException.class)
private PipelineTableMetaData getTableMetaData() {
return tableMetaDataLazyInitializer.get();
}

private Optional<Number> dump0(final Connection conn, final String sql, final Number startUniqueKeyValue, final int round) throws SQLException {
if (null != rateLimitAlgorithm) {
rateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
}
PipelineTableMetaData tableMetaData = getTableMetaData();
try (PreparedStatement preparedStatement = createPreparedStatement(conn, sql)) {
preparedStatement.setObject(1, startUniqueKeyValue);
preparedStatement.setObject(2, getPositionEndValue(inventoryDumperConfig.getPosition()));
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
Expand All @@ -47,16 +48,14 @@
* Incremental task.
*/
@Slf4j
@ToString(exclude = {"incrementalDumperExecuteEngine", "dataSourceManager", "dumper", "progress"})
@ToString(exclude = {"incrementalDumperExecuteEngine", "channel", "dumper", "importers", "progress"})
public final class IncrementalTask extends AbstractLifecycleExecutor implements PipelineTask, AutoCloseable {

@Getter
private final String taskId;

private final ExecuteEngine incrementalDumperExecuteEngine;

private final PipelineDataSourceManager dataSourceManager;

private final PipelineChannel channel;

private final Dumper dumper;
Expand All @@ -68,15 +67,14 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements

public IncrementalTask(final int concurrency, final DumperConfiguration dumperConfig, final ImporterConfiguration importerConfig,
final PipelineChannelFactory pipelineChannelFactory, final PipelineDataSourceManager dataSourceManager,
final ExecuteEngine incrementalDumperExecuteEngine) {
final PipelineTableMetaDataLoader sourceMetaDataLoader, final ExecuteEngine incrementalDumperExecuteEngine) {
this.incrementalDumperExecuteEngine = incrementalDumperExecuteEngine;
this.dataSourceManager = dataSourceManager;
taskId = dumperConfig.getDataSourceName();
progress = new IncrementalTaskProgress();
IngestPosition<?> position = dumperConfig.getPosition();
progress.setPosition(position);
channel = createChannel(concurrency, pipelineChannelFactory, progress);
dumper = DumperFactory.createIncrementalDumper(dumperConfig, position, dataSourceManager, channel);
dumper = DumperFactory.createIncrementalDumper(dumperConfig, position, channel, sourceMetaDataLoader);
importers = createImporters(concurrency, importerConfig, dataSourceManager, channel);
}

Expand Down
Expand Up @@ -32,12 +32,14 @@
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
import org.apache.shardingsphere.scaling.core.job.dumper.DumperFactory;
import org.apache.shardingsphere.scaling.core.job.importer.ImporterFactory;

import javax.sql.DataSource;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
Expand All @@ -46,16 +48,14 @@
* Inventory task.
*/
@Slf4j
@ToString(exclude = {"importerExecuteEngine", "dataSourceManager", "dumper"})
@ToString(exclude = {"importerExecuteEngine", "channel", "dumper", "importer"})
public final class InventoryTask extends AbstractLifecycleExecutor implements PipelineTask, AutoCloseable {

@Getter
private final String taskId;

private final ExecuteEngine importerExecuteEngine;

private final PipelineDataSourceManager dataSourceManager;

private final PipelineChannel channel;

private final Dumper dumper;
Expand All @@ -66,12 +66,12 @@ public final class InventoryTask extends AbstractLifecycleExecutor implements Pi

public InventoryTask(final InventoryDumperConfiguration inventoryDumperConfig, final ImporterConfiguration importerConfig,
final PipelineChannelFactory pipelineChannelFactory, final PipelineDataSourceManager dataSourceManager,
final DataSource sourceDataSource, final PipelineTableMetaDataLoader sourceMetaDataLoader,
final ExecuteEngine importerExecuteEngine) {
this.importerExecuteEngine = importerExecuteEngine;
this.dataSourceManager = dataSourceManager;
taskId = generateTaskId(inventoryDumperConfig);
channel = createChannel(pipelineChannelFactory);
dumper = DumperFactory.createInventoryDumper(inventoryDumperConfig, dataSourceManager, channel);
dumper = DumperFactory.createInventoryDumper(inventoryDumperConfig, channel, sourceDataSource, sourceMetaDataLoader);
importer = ImporterFactory.createImporter(importerConfig, dataSourceManager, channel);
position = inventoryDumperConfig.getPosition();
}
Expand Down
Expand Up @@ -19,12 +19,17 @@

import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;

Expand Down Expand Up @@ -60,6 +65,20 @@ public final class RuleAlteredJobContext {

private final PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager();

private final LazyInitializer<PipelineDataSourceWrapper> sourceDataSourceLazyInitializer = new LazyInitializer<PipelineDataSourceWrapper>() {
@Override
protected PipelineDataSourceWrapper initialize() {
return dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig());
}
};

private final LazyInitializer<PipelineTableMetaDataLoader> sourceMetaDataLoaderLazyInitializer = new LazyInitializer<PipelineTableMetaDataLoader>() {
@Override
protected PipelineTableMetaDataLoader initialize() throws ConcurrentException {
return new PipelineTableMetaDataLoader(sourceDataSourceLazyInitializer.get());
}
};

private RuleAlteredJobPreparer jobPreparer;

public RuleAlteredJobContext(final JobConfiguration jobConfig) {
Expand All @@ -71,6 +90,26 @@ public RuleAlteredJobContext(final JobConfiguration jobConfig) {
taskConfig = RuleAlteredJobWorker.buildTaskConfig(jobConfig.getPipelineConfig(), jobConfig.getHandleConfig(), ruleAlteredContext.getOnRuleAlteredActionConfig());
}

/**
* Get source data source.
*
* @return source data source
*/
@SneakyThrows(ConcurrentException.class)
public PipelineDataSourceWrapper getSourceDataSource() {
return sourceDataSourceLazyInitializer.get();
}

/**
* Get source metadata loader.
*
* @return source metadata loader
*/
@SneakyThrows(ConcurrentException.class)
public PipelineTableMetaDataLoader getSourceMetaDataLoader() {
return sourceMetaDataLoaderLazyInitializer.get();
}

/**
* Release resources.
*/
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.PositionInitializerFactory;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
Expand Down Expand Up @@ -64,8 +65,8 @@ public void prepare(final RuleAlteredJobContext jobContext) {
prepareTarget(jobContext.getJobConfig(), dataSourceManager);
initAndCheckDataSource(jobContext);
try {
initIncrementalTasks(jobContext, dataSourceManager);
initInventoryTasks(jobContext, dataSourceManager);
initIncrementalTasks(jobContext);
initInventoryTasks(jobContext);
log.info("prepare, jobId={}, shardingItem={}, inventoryTasks={}, incrementalTasks={}",
jobContext.getJobId(), jobContext.getShardingItem(), jobContext.getInventoryTasks(), jobContext.getIncrementalTasks());
} catch (final SQLException ex) {
Expand Down Expand Up @@ -112,18 +113,20 @@ private void checkTargetDataSource(final RuleAlteredJobContext jobContext, final
dataSourceChecker.checkTargetTable(targetDataSources, jobContext.getTaskConfig().getImporterConfig().getShardingColumnsMap().keySet());
}

private void initInventoryTasks(final RuleAlteredJobContext jobContext, final PipelineDataSourceManager dataSourceManager) {
List<InventoryTask> allInventoryTasks = inventoryTaskSplitter.splitInventoryData(jobContext, dataSourceManager);
private void initInventoryTasks(final RuleAlteredJobContext jobContext) {
List<InventoryTask> allInventoryTasks = inventoryTaskSplitter.splitInventoryData(jobContext);
jobContext.getInventoryTasks().addAll(allInventoryTasks);
}

private void initIncrementalTasks(final RuleAlteredJobContext jobContext, final PipelineDataSourceManager dataSourceManager) throws SQLException {
private void initIncrementalTasks(final RuleAlteredJobContext jobContext) throws SQLException {
PipelineChannelFactory pipelineChannelFactory = jobContext.getRuleAlteredContext().getPipelineChannelFactory();
ExecuteEngine incrementalDumperExecuteEngine = jobContext.getRuleAlteredContext().getIncrementalDumperExecuteEngine();
TaskConfiguration taskConfig = jobContext.getTaskConfig();
PipelineDataSourceManager dataSourceManager = jobContext.getDataSourceManager();
taskConfig.getDumperConfig().setPosition(getIncrementalPosition(jobContext, taskConfig, dataSourceManager));
PipelineTableMetaDataLoader sourceMetaDataLoader = jobContext.getSourceMetaDataLoader();
IncrementalTask incrementalTask = new IncrementalTask(taskConfig.getHandleConfig().getConcurrency(), taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
pipelineChannelFactory, dataSourceManager, incrementalDumperExecuteEngine);
pipelineChannelFactory, dataSourceManager, sourceMetaDataLoader, incrementalDumperExecuteEngine);
jobContext.getIncrementalTasks().add(incrementalTask);
}

Expand Down

0 comments on commit e915749

Please sign in to comment.