Skip to content

Commit

Permalink
AB: Addressing feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
ma-g-22 committed Apr 26, 2024
1 parent 48b2193 commit c5f579f
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 153 deletions.
192 changes: 127 additions & 65 deletions bigtable/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,25 +234,55 @@ const (
Unprotected
)

// Defines an automated backup policy for a table. Automated backups are enabled
// with the specified retention period and frequency in seconds. Currently the
// only valid retention period is 72 hours and valid frequency is 24 hours.
// Use a Nil pointer of this AutomatedBackupPolicy struct type to not change the
// policy. Use a AutomatedBackupPolicy struct with Nil values for the individual
// fields to not update those individual fields. Use a AutomatedBackupPolicy struct
// with zero values for both RetentionPeriod and Frequency to disable Automated Backups.
type AutomatedBackupPolicy struct {
type TableAutomatedBackupConfig interface {
isTableAutomatedBackupConfig()
}

// Defines an automated backup policy for a table.
// Use Nil to disable Automated Backups.
type TableAutomatedBackupPolicy struct {
// Required. How long the automated backups should be retained. The only
// supported value at this time is 3 days.
RetentionPeriod optional.Duration
Frequency optional.Duration
// Required. How frequently automated backups should occur. The only
// supported value at this time is 24 hours.
Frequency optional.Duration
}

var disableAutomatedBackupPolicy = AutomatedBackupPolicy{
RetentionPeriod: time.Duration(0),
Frequency: time.Duration(0),
func (*TableAutomatedBackupPolicy) isTableAutomatedBackupConfig() {}

func toAutomatedBackupConfigProto(automatedBackupConfig TableAutomatedBackupConfig) (*btapb.Table_AutomatedBackupPolicy_, error) {
if automatedBackupConfig == nil {
return nil, nil
}
switch backupConfig := automatedBackupConfig.(type) {
case *TableAutomatedBackupPolicy:
return backupConfig.toProto()
default:
return nil, fmt.Errorf("error: Unknown type of automated backup configuration")
}
}

func DisableAutomatedBackupPolicy() AutomatedBackupPolicy {
return disableAutomatedBackupPolicy
func (abp *TableAutomatedBackupPolicy) toProto() (*btapb.Table_AutomatedBackupPolicy_, error) {
if abp == nil {
return nil, nil
}
pbAutomatedBackupPolicy := &btapb.Table_AutomatedBackupPolicy{
RetentionPeriod: durationpb.New(0),
Frequency: durationpb.New(0),
}
if abp.RetentionPeriod == nil && abp.Frequency == nil {
return nil, errors.New("at least one of RetentionPeriod and Frequency must be set")
}
if abp.RetentionPeriod != nil {
pbAutomatedBackupPolicy.RetentionPeriod = durationpb.New(optional.ToDuration(abp.RetentionPeriod))
}
if abp.Frequency != nil {
pbAutomatedBackupPolicy.Frequency = durationpb.New(optional.ToDuration(abp.Frequency))
}
return &btapb.Table_AutomatedBackupPolicy_{
AutomatedBackupPolicy: pbAutomatedBackupPolicy,
}, nil
}

// Family represents a column family with its optional GC policy and value type.
Expand All @@ -277,7 +307,7 @@ type TableConf struct {
DeletionProtection DeletionProtection
ChangeStreamRetention ChangeStreamRetention
// Configure an automated backup policy for the table
AutomatedBackupPolicy *AutomatedBackupPolicy
AutomatedBackupConfig TableAutomatedBackupConfig
}

// CreateTable creates a new table in the instance.
Expand Down Expand Up @@ -317,17 +347,15 @@ func (ac *AdminClient) CreateTableFromConf(ctx context.Context, conf *TableConf)
tbl.ChangeStreamConfig = &btapb.ChangeStreamConfig{}
tbl.ChangeStreamConfig.RetentionPeriod = durationpb.New(conf.ChangeStreamRetention.(time.Duration))
}
if conf.AutomatedBackupPolicy != nil && *conf.AutomatedBackupPolicy != disableAutomatedBackupPolicy {
// Create Automated Backup Policy
if (conf.AutomatedBackupPolicy.Frequency != nil || conf.AutomatedBackupPolicy.RetentionPeriod != nil) &&
!(conf.AutomatedBackupPolicy.Frequency != nil && conf.AutomatedBackupPolicy.RetentionPeriod != nil) {
return errors.New("both Frequency and RetentionPeriod must be specified when creating a table with an AutomatedBackupPolicy")

var err error
if conf.AutomatedBackupConfig != nil {
tbl.AutomatedBackupConfig, err = toAutomatedBackupConfigProto(conf.AutomatedBackupConfig)
if err != nil {
return err
}
frequency := durationpb.New(conf.AutomatedBackupPolicy.Frequency.(time.Duration))
retention_period := durationpb.New(conf.AutomatedBackupPolicy.RetentionPeriod.(time.Duration))
automated_backup_policy := &btapb.Table_AutomatedBackupPolicy{RetentionPeriod: retention_period, Frequency: frequency}
tbl.AutomatedBackupConfig = &btapb.Table_AutomatedBackupPolicy_{AutomatedBackupPolicy: automated_backup_policy}
}

if conf.Families != nil && conf.ColumnFamilies != nil {
return errors.New("only one of Families or ColumnFamilies may be set, not both")
}
Expand Down Expand Up @@ -362,7 +390,7 @@ func (ac *AdminClient) CreateTableFromConf(ctx context.Context, conf *TableConf)
Table: &tbl,
InitialSplits: reqSplits,
}
_, err := ac.tClient.CreateTable(ctx, req)
_, err = ac.tClient.CreateTable(ctx, req)
return err
}

Expand Down Expand Up @@ -406,44 +434,78 @@ func (ac *AdminClient) CreateColumnFamilyWithConfig(ctx context.Context, table,
return err
}

const (
deletionProtectionFieldMask = "deletion_protection"
changeStreamRetentionFieldMask = "change_stream_config"
automatedBackupPolicyFieldMask = "automated_backup_policy"
)

// UpdateTableConf contains all of the information necessary to update a table with column families.
type UpdateTableConf struct {
tableID string
// deletionProtection can be unset, true or false
// set to true to make the table protected against data loss
deletionProtection DeletionProtection
changeStreamRetention ChangeStreamRetention
// Configure an automated backup policy for the table
automatedBackupPolicy *AutomatedBackupPolicy
// Set an automated backup configuration for the table
automatedBackupConfig TableAutomatedBackupConfig

includeInUpdateMask map[string]bool
}

// UpdateTableDisableChangeStream updates a table to disable change stream for table ID.
func (ac *AdminClient) UpdateTableDisableChangeStream(ctx context.Context, tableID string) error {
return ac.updateTableWithConf(ctx, &UpdateTableConf{tableID, None, time.Duration(0), nil})
return ac.updateTableWithConf(ctx, &UpdateTableConf{
tableID: tableID,
changeStreamRetention: time.Duration(0),
includeInUpdateMask: map[string]bool{
changeStreamRetentionFieldMask: true,
}})
}

// UpdateTableWithChangeStream updates a table to with the given table ID and change stream config.
func (ac *AdminClient) UpdateTableWithChangeStream(ctx context.Context, tableID string, changeStreamRetention ChangeStreamRetention) error {
return ac.updateTableWithConf(ctx, &UpdateTableConf{tableID, None, changeStreamRetention, nil})
return ac.updateTableWithConf(ctx, &UpdateTableConf{
tableID: tableID,
deletionProtection: None,
changeStreamRetention: changeStreamRetention,
includeInUpdateMask: map[string]bool{
changeStreamRetentionFieldMask: true,
}})
}

// UpdateTableWithDeletionProtection updates a table with the given table ID and deletion protection parameter.
func (ac *AdminClient) UpdateTableWithDeletionProtection(ctx context.Context, tableID string, deletionProtection DeletionProtection) error {
return ac.updateTableWithConf(ctx, &UpdateTableConf{tableID, deletionProtection, nil, nil})
return ac.updateTableWithConf(ctx, &UpdateTableConf{
tableID: tableID,
deletionProtection: deletionProtection,
includeInUpdateMask: map[string]bool{
deletionProtectionFieldMask: true,
}})
}

// UpdateTableDisableAutomatedBackupPolicy updates a table to disable automated backups for table ID.
func (ac *AdminClient) UpdateTableDisableAutomatedBackupPolicy(ctx context.Context, tableID string) error {
return ac.updateTableWithConf(ctx, &UpdateTableConf{tableID, None, nil, &disableAutomatedBackupPolicy})
return ac.updateTableWithConf(ctx, &UpdateTableConf{
tableID: tableID,
deletionProtection: None,
includeInUpdateMask: map[string]bool{
automatedBackupPolicyFieldMask: true,
}})
}

// UpdateTableWithAutomatedBackupPolicy updates a table to with the given table ID and automated backup policy config.
func (ac *AdminClient) UpdateTableWithAutomatedBackupPolicy(ctx context.Context, tableID string, automatedBackupPolicy AutomatedBackupPolicy) error {
return ac.updateTableWithConf(ctx, &UpdateTableConf{tableID, None, nil, &automatedBackupPolicy})
func (ac *AdminClient) UpdateTableWithAutomatedBackupPolicy(ctx context.Context, tableID string, automatedBackupPolicy TableAutomatedBackupPolicy) error {
return ac.updateTableWithConf(ctx, &UpdateTableConf{
tableID: tableID,
deletionProtection: None,
automatedBackupConfig: &automatedBackupPolicy,
includeInUpdateMask: map[string]bool{
automatedBackupPolicyFieldMask: true,
}})
}

// updateTableWithConf updates a table in the instance from the given configuration.
// only deletion protection can be updated at this period.
// table ID is required.
func (ac *AdminClient) updateTableWithConf(ctx context.Context, conf *UpdateTableConf) error {
if conf.tableID == "" {
Expand All @@ -463,45 +525,40 @@ func (ac *AdminClient) updateTableWithConf(ctx context.Context, conf *UpdateTabl
UpdateMask: updateMask,
}

if conf.deletionProtection != None {
updateMask.Paths = append(updateMask.Paths, "deletion_protection")
if _, include := conf.includeInUpdateMask[deletionProtectionFieldMask]; include {
updateMask.Paths = append(updateMask.Paths, deletionProtectionFieldMask)
req.Table.DeletionProtection = conf.deletionProtection != Unprotected
}

if conf.changeStreamRetention != nil {
if _, include := conf.includeInUpdateMask[changeStreamRetentionFieldMask]; include {
if conf.changeStreamRetention.(time.Duration) == time.Duration(0) {
updateMask.Paths = append(updateMask.Paths, "change_stream_config")
updateMask.Paths = append(updateMask.Paths, changeStreamRetentionFieldMask)
} else {
updateMask.Paths = append(updateMask.Paths, "change_stream_config.retention_period")
updateMask.Paths = append(updateMask.Paths, changeStreamRetentionFieldMask+".retention_period")
req.Table.ChangeStreamConfig = &btapb.ChangeStreamConfig{}
req.Table.ChangeStreamConfig.RetentionPeriod = durationpb.New(conf.changeStreamRetention.(time.Duration))
}
}

if conf.automatedBackupPolicy != nil {
if _, include := conf.includeInUpdateMask[automatedBackupPolicyFieldMask]; include {
// Update Automated Backup Policy
if *conf.automatedBackupPolicy == disableAutomatedBackupPolicy {
if conf.automatedBackupConfig == nil {
// Disable Automated Backup Policy
updateMask.Paths = append(updateMask.Paths, "automated_backup_policy")
} else if conf.automatedBackupPolicy.Frequency != nil && conf.automatedBackupPolicy.RetentionPeriod != nil {
// Update both Retention Period and Frequency
updateMask.Paths = append(updateMask.Paths, "automated_backup_policy")
frequency := durationpb.New(conf.automatedBackupPolicy.Frequency.(time.Duration))
retentionPeriod := durationpb.New(conf.automatedBackupPolicy.RetentionPeriod.(time.Duration))
automated_backup_policy := &btapb.Table_AutomatedBackupPolicy{RetentionPeriod: retentionPeriod, Frequency: frequency}
req.Table.AutomatedBackupConfig = &btapb.Table_AutomatedBackupPolicy_{AutomatedBackupPolicy: automated_backup_policy}
} else if conf.automatedBackupPolicy.Frequency != nil {
// Update Frequency
updateMask.Paths = append(updateMask.Paths, "automated_backup_policy.frequency")
frequency := durationpb.New(conf.automatedBackupPolicy.Frequency.(time.Duration))
automated_backup_policy := &btapb.Table_AutomatedBackupPolicy{Frequency: frequency}
req.Table.AutomatedBackupConfig = &btapb.Table_AutomatedBackupPolicy_{AutomatedBackupPolicy: automated_backup_policy}
} else if conf.automatedBackupPolicy.RetentionPeriod != nil {
// Update Retention Period
updateMask.Paths = append(updateMask.Paths, "automated_backup_policy.retention_period")
retentionPeriod := durationpb.New(conf.automatedBackupPolicy.RetentionPeriod.(time.Duration))
automated_backup_policy := &btapb.Table_AutomatedBackupPolicy{RetentionPeriod: retentionPeriod}
req.Table.AutomatedBackupConfig = &btapb.Table_AutomatedBackupPolicy_{AutomatedBackupPolicy: automated_backup_policy}
updateMask.Paths = append(updateMask.Paths, automatedBackupPolicyFieldMask)
} else {
abc, err := toAutomatedBackupConfigProto(conf.automatedBackupConfig)
if err != nil {
return err
}
if abc.AutomatedBackupPolicy.RetentionPeriod.Seconds != 0 {
// Update Retention Period
updateMask.Paths = append(updateMask.Paths, automatedBackupPolicyFieldMask+".retention_period")
}
if abc.AutomatedBackupPolicy.Frequency.Seconds != 0 {
// Update Frequency
updateMask.Paths = append(updateMask.Paths, automatedBackupPolicyFieldMask+".frequency")
}
req.Table.AutomatedBackupConfig = abc
}
}

Expand Down Expand Up @@ -554,7 +611,7 @@ type TableInfo struct {
// for example when using NAME_ONLY, the response does not contain DeletionProtection and the value should be None
DeletionProtection DeletionProtection
ChangeStreamRetention ChangeStreamRetention
AutomatedBackupPolicy *AutomatedBackupPolicy
AutomatedBackupConfig TableAutomatedBackupConfig
}

// FamilyInfo represents information about a column family.
Expand Down Expand Up @@ -614,9 +671,14 @@ func (ac *AdminClient) TableInfo(ctx context.Context, table string) (*TableInfo,
ti.ChangeStreamRetention = res.ChangeStreamConfig.RetentionPeriod.AsDuration()
}
if res.AutomatedBackupConfig != nil {
ti.AutomatedBackupPolicy = &AutomatedBackupPolicy{
Frequency: res.GetAutomatedBackupPolicy().Frequency.AsDuration(),
RetentionPeriod: res.GetAutomatedBackupPolicy().RetentionPeriod.AsDuration(),
switch res.AutomatedBackupConfig.(type) {
case *btapb.Table_AutomatedBackupPolicy_:
ti.AutomatedBackupConfig = &TableAutomatedBackupPolicy{
RetentionPeriod: res.GetAutomatedBackupPolicy().GetRetentionPeriod().AsDuration(),
Frequency: res.GetAutomatedBackupPolicy().GetFrequency().AsDuration(),
}
default:
return nil, fmt.Errorf("error: Unknown type of automated backup configuration")
}
}

Expand Down

0 comments on commit c5f579f

Please sign in to comment.