Skip to content

Commit

Permalink
feat(bigtable): Adding automated backups (#9702)
Browse files Browse the repository at this point in the history
* Adding automated backups

* AB updates: addressing feedback

* AB: addressing feedback

* AB: Addressing feedback

* AB: addressing feedback

* AB: addressing feedbackup

* AB: fixing workflow errors

* AB: fixing conflict

* AB: addressing feedback

* AB: refactoring

* AB: addressing feedback

* AB: addressing feedback

* AB: addressing feedback

* AB: fixing vet.sh workflow errors

* AB: fixing apidiff workflow error

* AB: Fixing vet workflow errors

* AB: fixing dependencies

* AB: fixing vet.sh error

* AB: updating go.mod

* AB: fixing go.sum

* AB: adding new lines

* AB: fixing go.mod & go.sum

* AB: fixing deps

* AB: fixing deps

* AB: udpating go.sum

---------

Co-authored-by: Baha Aiman <bahaaiman@google.com>
Co-authored-by: Igor Bernstein <igorbernstein@google.com>
  • Loading branch information
3 people committed May 8, 2024
1 parent 02aa7cf commit 9738386
Show file tree
Hide file tree
Showing 5 changed files with 552 additions and 58 deletions.
210 changes: 161 additions & 49 deletions bigtable/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,67 @@ const (
Unprotected
)

// TableAutomatedBackupConfig generalizes automated backup configurations.
// Currently, the only supported type of automated backup configuration
// is TableAutomatedBackupPolicy.
type TableAutomatedBackupConfig interface {
isTableAutomatedBackupConfig()
}

// TableAutomatedBackupPolicy defines an automated backup policy for a table.
// Use nil TableAutomatedBackupPolicy to disable Automated Backups on a table.
// Use nil for a specific field to ignore that field when updating the policy on a table.
type TableAutomatedBackupPolicy struct {
// How long the automated backups should be retained. The only
// supported value at this time is 3 days.
RetentionPeriod optional.Duration
// How frequently automated backups should occur. The only
// supported value at this time is 24 hours.
Frequency optional.Duration
}

func (*TableAutomatedBackupPolicy) isTableAutomatedBackupConfig() {}

func toAutomatedBackupConfigProto(automatedBackupConfig TableAutomatedBackupConfig) (*btapb.Table_AutomatedBackupPolicy_, error) {
if automatedBackupConfig == nil {
return nil, nil
}
switch backupConfig := automatedBackupConfig.(type) {
case *TableAutomatedBackupPolicy:
return backupConfig.toProto()
default:
return nil, fmt.Errorf("error: Unknown type of automated backup configuration")
}
}

func (abp *TableAutomatedBackupPolicy) toProto() (*btapb.Table_AutomatedBackupPolicy_, error) {
pbAutomatedBackupPolicy := &btapb.Table_AutomatedBackupPolicy{
RetentionPeriod: durationpb.New(0),
Frequency: durationpb.New(0),
}
if abp.RetentionPeriod == nil && abp.Frequency == nil {
return nil, errors.New("at least one of RetentionPeriod and Frequency must be set")
}
if abp.RetentionPeriod != nil {
pbAutomatedBackupPolicy.RetentionPeriod = durationpb.New(optional.ToDuration(abp.RetentionPeriod))
}
if abp.Frequency != nil {
pbAutomatedBackupPolicy.Frequency = durationpb.New(optional.ToDuration(abp.Frequency))
}
return &btapb.Table_AutomatedBackupPolicy_{
AutomatedBackupPolicy: pbAutomatedBackupPolicy,
}, nil
}

// Family represents a column family with its optional GC policy and value type.
type Family struct {
GCPolicy GCPolicy
ValueType Type
}

// UpdateTableConf is unused
type UpdateTableConf struct{}

// TableConf contains all the information necessary to create a table with column families.
type TableConf struct {
TableID string
Expand All @@ -259,6 +314,8 @@ type TableConf struct {
// set to protected to make the table protected against data loss
DeletionProtection DeletionProtection
ChangeStreamRetention ChangeStreamRetention
// Configure an automated backup policy for the table
AutomatedBackupConfig TableAutomatedBackupConfig
}

// CreateTable creates a new table in the instance.
Expand Down Expand Up @@ -298,6 +355,15 @@ func (ac *AdminClient) CreateTableFromConf(ctx context.Context, conf *TableConf)
tbl.ChangeStreamConfig = &btapb.ChangeStreamConfig{}
tbl.ChangeStreamConfig.RetentionPeriod = durationpb.New(conf.ChangeStreamRetention.(time.Duration))
}

if conf.AutomatedBackupConfig != nil {
proto, err := toAutomatedBackupConfigProto(conf.AutomatedBackupConfig)
if err != nil {
return err
}
tbl.AutomatedBackupConfig = proto
}

if conf.Families != nil && conf.ColumnFamilies != nil {
return errors.New("only one of Families or ColumnFamilies may be set, not both")
}
Expand Down Expand Up @@ -376,79 +442,113 @@ func (ac *AdminClient) CreateColumnFamilyWithConfig(ctx context.Context, table,
return err
}

// UpdateTableConf contains all of the information necessary to update a table with column families.
type UpdateTableConf struct {
tableID string
// deletionProtection can be unset, true or false
// set to true to make the table protected against data loss
deletionProtection DeletionProtection
changeStreamRetention ChangeStreamRetention
}

// UpdateTableDisableChangeStream updates a table to disable change stream for table ID.
func (ac *AdminClient) UpdateTableDisableChangeStream(ctx context.Context, tableID string) error {
return ac.updateTableWithConf(ctx, &UpdateTableConf{tableID, None, time.Duration(0)})
}

// UpdateTableWithChangeStream updates a table to with the given table ID and change stream config.
func (ac *AdminClient) UpdateTableWithChangeStream(ctx context.Context, tableID string, changeStreamRetention ChangeStreamRetention) error {
return ac.updateTableWithConf(ctx, &UpdateTableConf{tableID, None, changeStreamRetention})
}

// UpdateTableWithDeletionProtection updates a table with the given table ID and deletion protection parameter.
func (ac *AdminClient) UpdateTableWithDeletionProtection(ctx context.Context, tableID string, deletionProtection DeletionProtection) error {
return ac.updateTableWithConf(ctx, &UpdateTableConf{tableID, deletionProtection, nil})
}
const (
deletionProtectionFieldMask = "deletion_protection"
changeStreamConfigFieldMask = "change_stream_config"
automatedBackupPolicyFieldMask = "automated_backup_policy"
retentionPeriodFieldMaskPath = ".retention_period"
frequencyFieldMaskPath = ".frequency"
)

// updateTableWithConf updates a table in the instance from the given configuration.
// only deletion protection can be updated at this period.
// table ID is required.
func (ac *AdminClient) updateTableWithConf(ctx context.Context, conf *UpdateTableConf) error {
if conf.tableID == "" {
return errors.New("TableID is required")
func (ac *AdminClient) newUpdateTableRequestProto(tableID string) (*btapb.UpdateTableRequest, error) {
if tableID == "" {
return nil, errors.New("TableID is required")
}

ctx = mergeOutgoingMetadata(ctx, ac.md)

updateMask := &field_mask.FieldMask{
Paths: []string{},
}
prefix := ac.instancePrefix()
req := &btapb.UpdateTableRequest{
Table: &btapb.Table{
Name: prefix + "/tables/" + conf.tableID,
Name: ac.instancePrefix() + "/tables/" + tableID,
},
UpdateMask: updateMask,
}
return req, nil
}

if conf.deletionProtection != None {
updateMask.Paths = append(updateMask.Paths, "deletion_protection")
req.Table.DeletionProtection = conf.deletionProtection != Unprotected
}

if conf.changeStreamRetention != nil {
if conf.changeStreamRetention.(time.Duration) == time.Duration(0) {
updateMask.Paths = append(updateMask.Paths, "change_stream_config")
} else {
updateMask.Paths = append(updateMask.Paths, "change_stream_config.retention_period")
req.Table.ChangeStreamConfig = &btapb.ChangeStreamConfig{}
req.Table.ChangeStreamConfig.RetentionPeriod = durationpb.New(conf.changeStreamRetention.(time.Duration))
}
}
func (ac *AdminClient) updateTableAndWait(ctx context.Context, updateTableRequest *btapb.UpdateTableRequest) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)

lro, err := ac.tClient.UpdateTable(ctx, req)
lro, err := ac.tClient.UpdateTable(ctx, updateTableRequest)
if err != nil {
return fmt.Errorf("error from update: %w", err)
}

var tbl btapb.Table
op := longrunning.InternalNewOperation(ac.lroClient, lro)
err = op.Wait(ctx, &tbl)
if err != nil {
return fmt.Errorf("error from operation: %v", err)
}

return nil
}

// UpdateTableDisableChangeStream updates a table to disable change stream for table ID.
func (ac *AdminClient) UpdateTableDisableChangeStream(ctx context.Context, tableID string) error {
req, err := ac.newUpdateTableRequestProto(tableID)
if err != nil {
return err
}
req.UpdateMask.Paths = []string{changeStreamConfigFieldMask}
return ac.updateTableAndWait(ctx, req)
}

// UpdateTableWithChangeStream updates a table to with the given table ID and change stream config.
func (ac *AdminClient) UpdateTableWithChangeStream(ctx context.Context, tableID string, changeStreamRetention ChangeStreamRetention) error {
req, err := ac.newUpdateTableRequestProto(tableID)
if err != nil {
return err
}
req.UpdateMask.Paths = []string{changeStreamConfigFieldMask + retentionPeriodFieldMaskPath}
req.Table.ChangeStreamConfig = &btapb.ChangeStreamConfig{}
req.Table.ChangeStreamConfig.RetentionPeriod = durationpb.New(changeStreamRetention.(time.Duration))
return ac.updateTableAndWait(ctx, req)
}

// UpdateTableWithDeletionProtection updates a table with the given table ID and deletion protection parameter.
func (ac *AdminClient) UpdateTableWithDeletionProtection(ctx context.Context, tableID string, deletionProtection DeletionProtection) error {
req, err := ac.newUpdateTableRequestProto(tableID)
if err != nil {
return err
}
req.UpdateMask.Paths = []string{deletionProtectionFieldMask}
req.Table.DeletionProtection = deletionProtection != Unprotected
return ac.updateTableAndWait(ctx, req)
}

// UpdateTableDisableAutomatedBackupPolicy updates a table to disable automated backups for table ID.
func (ac *AdminClient) UpdateTableDisableAutomatedBackupPolicy(ctx context.Context, tableID string) error {
req, err := ac.newUpdateTableRequestProto(tableID)
if err != nil {
return err
}
req.UpdateMask.Paths = []string{automatedBackupPolicyFieldMask}
return ac.updateTableAndWait(ctx, req)
}

// UpdateTableWithAutomatedBackupPolicy updates a table to with the given table ID and automated backup policy config.
func (ac *AdminClient) UpdateTableWithAutomatedBackupPolicy(ctx context.Context, tableID string, automatedBackupPolicy TableAutomatedBackupPolicy) error {
req, err := ac.newUpdateTableRequestProto(tableID)
if err != nil {
return err
}
abc, err := toAutomatedBackupConfigProto(&automatedBackupPolicy)
if err != nil {
return err
}
if abc.AutomatedBackupPolicy.RetentionPeriod.Seconds != 0 {
// Update Retention Period
req.UpdateMask.Paths = append(req.UpdateMask.Paths, automatedBackupPolicyFieldMask+retentionPeriodFieldMaskPath)
}
if abc.AutomatedBackupPolicy.Frequency.Seconds != 0 {
// Update Frequency
req.UpdateMask.Paths = append(req.UpdateMask.Paths, automatedBackupPolicyFieldMask+frequencyFieldMaskPath)
}
req.Table.AutomatedBackupConfig = abc
return ac.updateTableAndWait(ctx, req)
}

// DeleteTable deletes a table and all of its data.
func (ac *AdminClient) DeleteTable(ctx context.Context, table string) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
Expand Down Expand Up @@ -485,6 +585,7 @@ type TableInfo struct {
// for example when using NAME_ONLY, the response does not contain DeletionProtection and the value should be None
DeletionProtection DeletionProtection
ChangeStreamRetention ChangeStreamRetention
AutomatedBackupConfig TableAutomatedBackupConfig
}

// FamilyInfo represents information about a column family.
Expand Down Expand Up @@ -543,6 +644,17 @@ func (ac *AdminClient) TableInfo(ctx context.Context, table string) (*TableInfo,
if res.ChangeStreamConfig != nil && res.ChangeStreamConfig.RetentionPeriod != nil {
ti.ChangeStreamRetention = res.ChangeStreamConfig.RetentionPeriod.AsDuration()
}
if res.AutomatedBackupConfig != nil {
switch res.AutomatedBackupConfig.(type) {
case *btapb.Table_AutomatedBackupPolicy_:
ti.AutomatedBackupConfig = &TableAutomatedBackupPolicy{
RetentionPeriod: res.GetAutomatedBackupPolicy().GetRetentionPeriod().AsDuration(),
Frequency: res.GetAutomatedBackupPolicy().GetFrequency().AsDuration(),
}
default:
return nil, fmt.Errorf("error: Unknown type of automated backup configuration")
}
}

return ti, nil
}
Expand Down

0 comments on commit 9738386

Please sign in to comment.