Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add logic in replication for handling rows/cells that may exceed thresholds #4315

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,37 @@

package com.google.cloud.bigtable.hbase.replication;

import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.DEFAULT_FILTER_LARGE_ROWS;
import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.DEFAULT_FILTER_LARGE_ROWS_THRESHOLD_IN_BYTES;
import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.DEFAULT_FILTER_MAX_CELLS_PER_MUTATION;
import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.DEFAULT_FILTER_MAX_CELLS_PER_MUTATION_THRESHOLD;
import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.FILTER_LARGE_ROWS_KEY;
import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.FILTER_LARGE_ROWS_THRESHOLD_IN_BYTES_KEY;
import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.FILTER_MAX_CELLS_PER_MUTATION_KEY;
import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.FILTER_MAX_CELLS_PER_MUTATION_THRESHOLD_KEY;
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.DROPPED_INCOMPATIBLE_MUTATION_MAX_CELLS_METRIC_KEY;
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY;
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.DROPPED_INCOMPATIBLE_MUTATION_ROW_SIZE_METRIC_KEY;
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.INCOMPATIBLE_MUTATION_METRIC_KEY;

import com.google.bigtable.repackaged.com.google.api.client.util.Preconditions;
import com.google.bigtable.repackaged.com.google.api.core.InternalApi;
import com.google.bigtable.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.bigtable.repackaged.com.google.common.base.Objects;
import com.google.cloud.bigtable.hbase.replication.metrics.MetricsExporter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Table;
Expand Down Expand Up @@ -140,6 +156,7 @@ static MutationBuilder getMutationBuilder(Cell cell) {
private final Connection connection;
private final String tableName;
private final Map<ByteRange, List<Cell>> cellsToReplicateByRow;
private MetricsExporter metricsExporter;

public CloudBigtableReplicationTask(
String tableName, Connection connection, Map<ByteRange, List<Cell>> entriesToReplicate)
Expand All @@ -149,6 +166,16 @@ public CloudBigtableReplicationTask(
this.tableName = tableName;
}

public CloudBigtableReplicationTask(
String tableName,
Connection connection,
Map<ByteRange, List<Cell>> entriesToReplicate,
MetricsExporter metricsExporter)
throws IOException {
this(tableName, connection, entriesToReplicate);
this.metricsExporter = metricsExporter;
}

/**
* Replicates the list of WAL entries into CBT.
*
Expand All @@ -160,6 +187,7 @@ public Boolean call() {

try {
Table table = connection.getTable(TableName.valueOf(tableName));
Configuration conf = this.connection.getConfiguration();

// Collect all the cells to replicate in this call.
// All mutations in a WALEdit are atomic, this atomicity must be preserved. The order of WAL
Expand All @@ -183,18 +211,28 @@ public Boolean call() {
// Create a rowMutations and add it to the list to be flushed to CBT.
RowMutations rowMutations =
buildRowMutations(cellsByRow.getKey().deepCopyToNewArray(), cellsByRow.getValue());
rowMutationsList.add(rowMutations);
}

Object[] results = new Object[rowMutationsList.size()];
table.batch(rowMutationsList, results);
// verify if row mutations within size and count thresholds
boolean logAndSkipIncompatibleRowMutations =
verifyRowMutationThresholds(rowMutations, conf, this.metricsExporter);

if (!logAndSkipIncompatibleRowMutations) {
rowMutationsList.add(rowMutations);
}
}

// Make sure that there were no errors returned via results.
for (Object result : results) {
if (result != null && result instanceof Throwable) {
LOG.error("Encountered error while replicating wal entry.", (Throwable) result);
succeeded = false;
break;
// commit batch
if (!rowMutationsList.isEmpty()) {
Object[] results = new Object[rowMutationsList.size()];
table.batch(rowMutationsList, results);

// Make sure that there were no errors returned via results.
for (Object result : results) {
if (result != null && result instanceof Throwable) {
LOG.error("Encountered error while replicating wal entry.", (Throwable) result);
succeeded = false;
break;
}
}
}
} catch (Throwable t) {
Expand Down Expand Up @@ -222,6 +260,106 @@ static RowMutations buildRowMutations(byte[] rowKey, List<Cell> cellList) throws
return rowMutationBuffer;
}

// verify shape of row mutation within defined thresholds. row mutations may contain many
// mutations and each mutation may contain many cells. the conditions that may be configured to
// be evaluated include: 1) max cells in a single mutation, 2) total mutations in row mutations,
// and 3) max size of all mutations in row mutations
@VisibleForTesting
static boolean verifyRowMutationThresholds(
RowMutations rowMutations, Configuration conf, MetricsExporter metricsExporter) {
boolean logAndSkipIncompatibleRowMutations = false;

// verify if threshold check is enabled for large rows or max cells
if (conf.getBoolean(FILTER_LARGE_ROWS_KEY, DEFAULT_FILTER_LARGE_ROWS)
|| conf.getBoolean(
FILTER_MAX_CELLS_PER_MUTATION_KEY, DEFAULT_FILTER_MAX_CELLS_PER_MUTATION)) {

// iterate row mutations
long totalByteSize = 0L;
int maxCellCountOfMutations = 0;
for (Mutation m : rowMutations.getMutations()) {
totalByteSize += m.heapSize();
if (maxCellCountOfMutations < m.size()) maxCellCountOfMutations = m.size();
}

// check large rows
int maxSize =
conf.getInt(
FILTER_LARGE_ROWS_THRESHOLD_IN_BYTES_KEY,
DEFAULT_FILTER_LARGE_ROWS_THRESHOLD_IN_BYTES);
if (conf.getBoolean(FILTER_LARGE_ROWS_KEY, DEFAULT_FILTER_LARGE_ROWS)
&& totalByteSize > maxSize) {

// exceeding limit, log and skip
logAndSkipIncompatibleRowMutations = true;
incrementDroppedIncompatibleMutationsRowSizeExceeded(metricsExporter);
LOG.warn(
"Dropping mutation, row mutations length, "
+ totalByteSize
+ ", exceeds filter length threshold ("
+ FILTER_LARGE_ROWS_THRESHOLD_IN_BYTES_KEY
+ "), "
+ maxSize
+ ", mutation row key: "
+ Bytes.toStringBinary(rowMutations.getRow()));
}

// check max cells or max mutations
int maxCellsOrMutations =
conf.getInt(
FILTER_MAX_CELLS_PER_MUTATION_THRESHOLD_KEY,
DEFAULT_FILTER_MAX_CELLS_PER_MUTATION_THRESHOLD);
if (conf.getBoolean(FILTER_MAX_CELLS_PER_MUTATION_KEY, DEFAULT_FILTER_MAX_CELLS_PER_MUTATION)
&& (rowMutations.getMutations().size() > maxCellsOrMutations
|| maxCellCountOfMutations > maxCellsOrMutations)) {

// exceeding limit, log and skip
logAndSkipIncompatibleRowMutations = true;
incrementDroppedIncompatibleMutationsMaxCellsExceeded(metricsExporter);
LOG.warn(
"Dropping mutation, row mutation size with total mutations, "
+ rowMutations.getMutations().size()
+ ", or max cells per mutation, "
+ maxCellCountOfMutations
+ ", exceeds filter size ("
+ FILTER_MAX_CELLS_PER_MUTATION_KEY
+ "), "
+ maxCellsOrMutations
+ ", mutation row key: "
+ Bytes.toStringBinary(rowMutations.getRow()));
}
}
return logAndSkipIncompatibleRowMutations;
}

private static void incrementMetric(
jhambleton marked this conversation as resolved.
Show resolved Hide resolved
MetricsExporter metricsExporter, String metricName, int delta) {
if (metricsExporter == null) return;
metricsExporter.incCounters(metricName, delta);
}

private static void incrementDroppedIncompatibleMutationsRowSizeExceeded(
MetricsExporter metricsExporter) {
incrementMetric(metricsExporter, DROPPED_INCOMPATIBLE_MUTATION_ROW_SIZE_METRIC_KEY, 1);
incrementIncompatibleMutations(metricsExporter);
incrementDroppedIncompatibleMutations(metricsExporter);
}

private static void incrementDroppedIncompatibleMutationsMaxCellsExceeded(
MetricsExporter metricsExporter) {
incrementMetric(metricsExporter, DROPPED_INCOMPATIBLE_MUTATION_MAX_CELLS_METRIC_KEY, 1);
incrementIncompatibleMutations(metricsExporter);
incrementDroppedIncompatibleMutations(metricsExporter);
}

private static void incrementIncompatibleMutations(MetricsExporter metricsExporter) {
incrementMetric(metricsExporter, INCOMPATIBLE_MUTATION_METRIC_KEY, 1);
}

private static void incrementDroppedIncompatibleMutations(MetricsExporter metricsExporter) {
incrementMetric(metricsExporter, DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY, 1);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,8 @@ private Future<Boolean> replicateBatch(
String tableName, Map<ByteRange, List<Cell>> batchToReplicate) {
try {
CloudBigtableReplicationTask replicationTask =
new CloudBigtableReplicationTask(tableName, sharedResources.connection, batchToReplicate);
new CloudBigtableReplicationTask(
tableName, sharedResources.connection, batchToReplicate, metricsExporter);
return sharedResources.executorService.submit(replicationTask);
} catch (Exception ex) {
if (ex instanceof InterruptedException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,14 @@

package com.google.cloud.bigtable.hbase.replication.adapters;

import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.DEFAULT_FILTER_LARGE_CELLS;
import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.DEFAULT_FILTER_LARGE_CELLS_THRESHOLD_IN_BYTES;
import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.FILTER_LARGE_CELLS_KEY;
import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.FILTER_LARGE_CELLS_THRESHOLD_IN_BYTES_KEY;
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.DROPPED_INCOMPATIBLE_MUTATION_CELL_SIZE_METRIC_KEY;
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.DROPPED_INCOMPATIBLE_MUTATION_MAX_CELLS_METRIC_KEY;
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY;
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.DROPPED_INCOMPATIBLE_MUTATION_ROW_SIZE_METRIC_KEY;
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.INCOMPATIBLE_MUTATION_DELETES_METRICS_KEY;
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.INCOMPATIBLE_MUTATION_METRIC_KEY;
import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.INCOMPATIBLE_MUTATION_TIMESTAMP_OVERFLOW_METRIC_KEY;
Expand Down Expand Up @@ -76,6 +83,12 @@ private void incrementPutsInFutureMutations() {
metricsExporter.incCounters(PUTS_IN_FUTURE_METRIC_KEY, 1);
}

private void incrementDroppedIncompatibleMutationsCellSizeExceeded() {
incrementIncompatibleMutations();
incrementDroppedIncompatibleMutations();
metricsExporter.incCounters(DROPPED_INCOMPATIBLE_MUTATION_CELL_SIZE_METRIC_KEY, 1);
}

/**
* Creates an IncompatibleMutationAdapter with HBase configuration, MetricSource, and CBT
* connection.
Expand All @@ -100,6 +113,9 @@ public IncompatibleMutationAdapter(
metricsExporter.incCounters(INCOMPATIBLE_MUTATION_DELETES_METRICS_KEY, 0);
metricsExporter.incCounters(INCOMPATIBLE_MUTATION_TIMESTAMP_OVERFLOW_METRIC_KEY, 0);
metricsExporter.incCounters(PUTS_IN_FUTURE_METRIC_KEY, 0);
metricsExporter.incCounters(DROPPED_INCOMPATIBLE_MUTATION_CELL_SIZE_METRIC_KEY, 0);
metricsExporter.incCounters(DROPPED_INCOMPATIBLE_MUTATION_ROW_SIZE_METRIC_KEY, 0);
metricsExporter.incCounters(DROPPED_INCOMPATIBLE_MUTATION_MAX_CELLS_METRIC_KEY, 0);
}

private boolean isValidDelete(Cell delete) {
Expand Down Expand Up @@ -142,6 +158,29 @@ public final List<Cell> adaptIncompatibleMutations(BigtableWALEntry walEntry) {

// All puts are valid.
if (cell.getTypeByte() == KeyValue.Type.Put.getCode()) {
// check max cell size
if (conf.getBoolean(FILTER_LARGE_CELLS_KEY, DEFAULT_FILTER_LARGE_CELLS)
&& cell.getValueLength()
> conf.getInt(
FILTER_LARGE_CELLS_THRESHOLD_IN_BYTES_KEY,
DEFAULT_FILTER_LARGE_CELLS_THRESHOLD_IN_BYTES)) {
// Drop the cell as it exceeds the size to be filtered
incrementDroppedIncompatibleMutationsCellSizeExceeded();

LOG.warn(
"Dropping mutation, cell value length, "
+ cell.getValueLength()
+ ", exceeds filter length ("
+ FILTER_LARGE_CELLS_THRESHOLD_IN_BYTES_KEY
+ "), "
+ conf.getInt(
FILTER_LARGE_CELLS_THRESHOLD_IN_BYTES_KEY,
DEFAULT_FILTER_LARGE_CELLS_THRESHOLD_IN_BYTES)
+ ", cell: "
+ cell);
continue;
}

// flag if put is issued for future timestamp
// do not log as we might fill up disk space due condition being true from clock skew
if (cell.getTimestamp() > walEntry.getWalWriteTime()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,48 @@ private HBaseToCloudBigtableReplicationConfiguration() {}
// TODO maybe it should depend on the number of processors on the VM.
public static final int DEFAULT_THREAD_COUNT = 10;

/**
* Determines if row mutations that exceed value of FILTER_LARGE_ROWS_THRESHOLD_IN_BYTES_KEY
* should be logged and dropped.
*/
public static final String FILTER_LARGE_ROWS_KEY =
"google.bigtable.replication.filter_large_rows";

public static final Boolean DEFAULT_FILTER_LARGE_ROWS = false;
/**
* Determines the size in bytes of the row mutations that should be logged and dropped when
* replicating to Bigtable. Default: Approximate row size accepted for batch mutation request.
*/
public static final String FILTER_LARGE_ROWS_THRESHOLD_IN_BYTES_KEY =
"google.bigtable.replication.large_rows_threshold_bytes";

public static final Integer DEFAULT_FILTER_LARGE_ROWS_THRESHOLD_IN_BYTES = 190 * 1024 * 1024;

/**
* Determines the size in bytes of the row mutations that should be logged and dropped when
* replicating to Bigtable based on value of FILTER_MAX_CELLS_PER_MUTATION_THRESHOLD_KEY. Default:
* Approximate max cells for batch mutation request.
*/
public static final String FILTER_MAX_CELLS_PER_MUTATION_KEY =
"google.bigtable.replication.filter_max_cells_per_mutation";

public static final Boolean DEFAULT_FILTER_MAX_CELLS_PER_MUTATION = false;
public static final String FILTER_MAX_CELLS_PER_MUTATION_THRESHOLD_KEY =
"google.bigtable.replication.max_cells_per_mutation";
public static final Integer DEFAULT_FILTER_MAX_CELLS_PER_MUTATION_THRESHOLD = 100_000 - 1;

/**
* Determines if cells that exceed value of FILTER_LARGE_CELLS_THRESHOLD_IN_BYTES_KEY should be
* logged and dropped.
*/
public static final String FILTER_LARGE_CELLS_KEY =
"google.bigtable.replication.filter_large_cells";

public static final Boolean DEFAULT_FILTER_LARGE_CELLS = false;
public static final String FILTER_LARGE_CELLS_THRESHOLD_IN_BYTES_KEY =
"google.bigtable.replication.large_cells_threshold_bytes";
public static final Integer DEFAULT_FILTER_LARGE_CELLS_THRESHOLD_IN_BYTES = 100 * 1024 * 1024;

/**
* Determines the size of request to CBT. This parameter controls the number of concurrent RPCs to
* Cloud Bigtable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ private HBaseToCloudBigtableReplicationMetrics() {}
"bigtableIncompatibleDeleteMutations";
public static final String INCOMPATIBLE_MUTATION_TIMESTAMP_OVERFLOW_METRIC_KEY =
"bigtableIncompatibleTimestampOverflowMutation";

public static final String DROPPED_INCOMPATIBLE_MUTATION_ROW_SIZE_METRIC_KEY =
"bigtableIncompatibleMutationsRowSizeExceeded";
public static final String DROPPED_INCOMPATIBLE_MUTATION_MAX_CELLS_METRIC_KEY =
"bigtableIncompatibleMutationsMaxCellsPerMutationExceeded";
public static final String DROPPED_INCOMPATIBLE_MUTATION_CELL_SIZE_METRIC_KEY =
"bigtableIncompatibleMutationsCellSizeExceeded";

public static final String PUTS_IN_FUTURE_METRIC_KEY = "bigtablePutsInFutureMutations";

/**
Expand Down