Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): support for tables primary and foreign keys #8055

Merged
merged 10 commits into from
Jun 21, 2023
138 changes: 138 additions & 0 deletions bigquery/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,106 @@ type TableMetadata struct {
// - '': empty string. Default to case-sensitive behavior.
// More information: https://cloud.google.com/bigquery/docs/reference/standard-sql/collation-concepts
DefaultCollation string

// TableConstraints contains table primary and foreign keys constraints.
// Present only if the table has primary or foreign keys.
TableConstraints *TableConstraints
}

// TableConstraints defines the primary key and foreign key of a table.
type TableConstraints struct {
// PrimaryKey constraint on a table's columns.
// Present only if the table has a primary key.
// The primary key is not enforced.
PrimaryKey *PrimaryKey

// ForeignKeys represent a list of foreign keys constraints.
// Foreign keys are not enforced.
ForeignKeys []*ForeignKey
}

// PrimaryKey represents the primary key constraint on a table's columns.
type PrimaryKey struct {
// Columns that compose the primary key constraint.
Columns []string
alvarowolfx marked this conversation as resolved.
Show resolved Hide resolved
}

func (pk *PrimaryKey) toBQ() *bq.TableConstraintsPrimaryKey {
return &bq.TableConstraintsPrimaryKey{
Columns: pk.Columns,
}
}

func bqToPrimaryKey(tc *bq.TableConstraints) *PrimaryKey {
if tc.PrimaryKey == nil {
return nil
}
return &PrimaryKey{
Columns: tc.PrimaryKey.Columns,
}
}

// ForeignKey represents a foreign key constraint on a table's columns.
type ForeignKey struct {
// Foreign key constraint name.
Name string

// Table that holds the primary key and is referenced by this foreign key.
ReferencedTable *Table

// Columns that compose the foreign key.
ColumnReferences []*ColumnReference
}

func (fk *ForeignKey) toBQ() *bq.TableConstraintsForeignKeys {
colRefs := []*bq.TableConstraintsForeignKeysColumnReferences{}
for _, colRef := range fk.ColumnReferences {
colRefs = append(colRefs, colRef.toBQ())
}
return &bq.TableConstraintsForeignKeys{
Name: fk.Name,
ReferencedTable: &bq.TableConstraintsForeignKeysReferencedTable{
DatasetId: fk.ReferencedTable.DatasetID,
ProjectId: fk.ReferencedTable.ProjectID,
TableId: fk.ReferencedTable.TableID,
},
ColumnReferences: colRefs,
}
}

func bqToForeignKeys(tc *bq.TableConstraints, c *Client) []*ForeignKey {
fks := []*ForeignKey{}
for _, fk := range tc.ForeignKeys {
colRefs := []*ColumnReference{}
for _, colRef := range fk.ColumnReferences {
colRefs = append(colRefs, &ColumnReference{
ReferencedColumn: colRef.ReferencedColumn,
ReferencingColumn: colRef.ReferencingColumn,
})
}
fks = append(fks, &ForeignKey{
Name: fk.Name,
ReferencedTable: c.DatasetInProject(fk.ReferencedTable.DatasetId, fk.ReferencedTable.ProjectId).Table(fk.ReferencedTable.TableId),
ColumnReferences: colRefs,
})
}
return fks
}

// ColumnReference represents the pair of the foreign key column and primary key column.
type ColumnReference struct {
// ReferencingColumn is the column in the current table that composes the foreign key.
ReferencingColumn string
// ReferencedColumn is the column in the primary key of the foreign table that
// is referenced by the ReferencingColumn.
ReferencedColumn string
}

func (colRef *ColumnReference) toBQ() *bq.TableConstraintsForeignKeysColumnReferences {
return &bq.TableConstraintsForeignKeysColumnReferences{
ReferencedColumn: colRef.ReferencedColumn,
ReferencingColumn: colRef.ReferencingColumn,
}
}

// TableCreateDisposition specifies the circumstances under which destination table will be created.
Expand Down Expand Up @@ -675,6 +775,19 @@ func (tm *TableMetadata) toBQ() (*bq.Table, error) {
return nil, errors.New("cannot set ETag on create")
}
t.DefaultCollation = string(tm.DefaultCollation)

if tm.TableConstraints != nil {
t.TableConstraints = &bq.TableConstraints{}
if tm.TableConstraints.PrimaryKey != nil {
t.TableConstraints.PrimaryKey = tm.TableConstraints.PrimaryKey.toBQ()
}
if len(tm.TableConstraints.ForeignKeys) > 0 {
t.TableConstraints.ForeignKeys = make([]*bq.TableConstraintsForeignKeys, len(tm.TableConstraints.ForeignKeys))
for i, fk := range tm.TableConstraints.ForeignKeys {
t.TableConstraints.ForeignKeys[i] = fk.toBQ()
}
}
}
return t, nil
}

Expand Down Expand Up @@ -788,6 +901,12 @@ func bqToTableMetadata(t *bq.Table, c *Client) (*TableMetadata, error) {
}
md.ExternalDataConfig = edc
}
if t.TableConstraints != nil {
md.TableConstraints = &TableConstraints{
PrimaryKey: bqToPrimaryKey(t.TableConstraints),
ForeignKeys: bqToForeignKeys(t.TableConstraints, c),
}
}
return md, nil
}

Expand Down Expand Up @@ -947,6 +1066,21 @@ func (tm *TableMetadataToUpdate) toBQ() (*bq.Table, error) {
t.DefaultCollation = optional.ToString(tm.DefaultCollation)
forceSend("DefaultCollation")
}
if tm.TableConstraints != nil {
t.TableConstraints = &bq.TableConstraints{}
if tm.TableConstraints.PrimaryKey != nil {
t.TableConstraints.PrimaryKey = tm.TableConstraints.PrimaryKey.toBQ()
t.TableConstraints.PrimaryKey.ForceSendFields = append(t.TableConstraints.PrimaryKey.ForceSendFields, "Columns")
t.TableConstraints.ForceSendFields = append(t.TableConstraints.ForceSendFields, "PrimaryKey")
}
if tm.TableConstraints.ForeignKeys != nil {
t.TableConstraints.ForeignKeys = make([]*bq.TableConstraintsForeignKeys, len(tm.TableConstraints.ForeignKeys))
for i, fk := range tm.TableConstraints.ForeignKeys {
t.TableConstraints.ForeignKeys[i] = fk.toBQ()
}
t.TableConstraints.ForceSendFields = append(t.TableConstraints.ForceSendFields, "ForeignKeys")
}
}
labels, forces, nulls := tm.update()
t.Labels = labels
t.ForceSendFields = append(t.ForceSendFields, forces...)
Expand Down Expand Up @@ -1024,6 +1158,10 @@ type TableMetadataToUpdate struct {
// in the table.
DefaultCollation optional.String

// TableConstraints allows modification of table constraints
// such as primary and foreign keys.
TableConstraints *TableConstraints

labelUpdater
}

Expand Down
186 changes: 186 additions & 0 deletions bigquery/table_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,3 +661,189 @@ func TestIntegration_TableDefaultCollation(t *testing.T) {
}
}
}

func TestIntegration_TableConstraintsPK(t *testing.T) {
// Test Primary Keys for Table.Create and Table.Update
if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()
table := dataset.Table(tableIDs.New())
err := table.Create(context.Background(), &TableMetadata{
Schema: schema,
TableConstraints: &TableConstraints{
PrimaryKey: &PrimaryKey{
Columns: []string{"name"},
},
},
ExpirationTime: testTableExpiration,
})
if err != nil {
t.Fatal(err)
}
defer table.Delete(ctx)
md, err := table.Metadata(ctx)
if err != nil {
t.Fatal(err)
}
if md.TableConstraints.PrimaryKey.Columns[0] != "name" {
t.Fatalf("expected table primary key to contain column `name`, but found %q", md.TableConstraints.PrimaryKey.Columns)
}

md, err = table.Update(ctx, TableMetadataToUpdate{
TableConstraints: &TableConstraints{
PrimaryKey: &PrimaryKey{}, // clean primary keys
},
}, "")
if err != nil {
t.Fatal(err)
}
if md.TableConstraints != nil {
t.Fatalf("expected table primary keys to be removed, but found %v", md.TableConstraints.PrimaryKey)
}

tableNoPK := dataset.Table(tableIDs.New())
err = tableNoPK.Create(context.Background(), &TableMetadata{
Schema: schema,
ExpirationTime: testTableExpiration,
})
if err != nil {
t.Fatal(err)
}
defer tableNoPK.Delete(ctx)
md, err = tableNoPK.Metadata(ctx)
if err != nil {
t.Fatal(err)
}
if md.TableConstraints != nil {
t.Fatalf("expected table to not have a PK, but found %v", md.TableConstraints.PrimaryKey.Columns)
}

md, err = tableNoPK.Update(ctx, TableMetadataToUpdate{
TableConstraints: &TableConstraints{
PrimaryKey: &PrimaryKey{
Columns: []string{"name"},
},
},
}, "")
if err != nil {
t.Fatal(err)
}
if md.TableConstraints.PrimaryKey == nil || md.TableConstraints.PrimaryKey.Columns[0] != "name" {
t.Fatalf("expected table primary key to contain column `name`, but found %v", md.TableConstraints.PrimaryKey)
}
}

func TestIntegration_TableConstraintsFK(t *testing.T) {
// Test Foreign keys for Table.Create and Table.Update
if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()
tableA := dataset.Table(tableIDs.New())
schemaA := []*FieldSchema{
{Name: "id", Type: IntegerFieldType},
{Name: "name", Type: StringFieldType},
}
err := tableA.Create(context.Background(), &TableMetadata{
Schema: schemaA,
TableConstraints: &TableConstraints{
PrimaryKey: &PrimaryKey{
Columns: []string{"id"},
},
},
ExpirationTime: testTableExpiration,
})
if err != nil {
t.Fatal(err)
}
defer tableA.Delete(ctx)

tableB := dataset.Table(tableIDs.New())
schemaB := []*FieldSchema{
{Name: "id", Type: IntegerFieldType},
{Name: "name", Type: StringFieldType},
{Name: "parent", Type: IntegerFieldType},
}
err = tableB.Create(context.Background(), &TableMetadata{
Schema: schemaB,
TableConstraints: &TableConstraints{
PrimaryKey: &PrimaryKey{
Columns: []string{"id"},
},
ForeignKeys: []*ForeignKey{
{
Name: "table_a_fk",
ReferencedTable: tableA,
ColumnReferences: []*ColumnReference{
{
ReferencingColumn: "parent",
ReferencedColumn: "id",
},
},
},
},
},
ExpirationTime: testTableExpiration,
})
if err != nil {
t.Fatal(err)
}
defer tableB.Delete(ctx)
md, err := tableB.Metadata(ctx)
if err != nil {
t.Fatal(err)
}
if len(md.TableConstraints.ForeignKeys) >= 0 && md.TableConstraints.ForeignKeys[0].Name != "table_a_fk" {
t.Fatalf("expected table to contains fk `table_a_fk`, but found %v", md.TableConstraints.ForeignKeys)
}

md, err = tableB.Update(ctx, TableMetadataToUpdate{
TableConstraints: &TableConstraints{
ForeignKeys: []*ForeignKey{}, // clean foreign keys
},
}, "")
if err != nil {
t.Fatal(err)
}
if len(md.TableConstraints.ForeignKeys) > 0 {
t.Fatalf("expected table foreign keys to be removed, but found %v", md.TableConstraints.ForeignKeys)
}

tableNoFK := dataset.Table(tableIDs.New())
err = tableNoFK.Create(context.Background(), &TableMetadata{
Schema: schemaB,
TableConstraints: &TableConstraints{
PrimaryKey: &PrimaryKey{
Columns: []string{"id"},
},
},
ExpirationTime: testTableExpiration,
})
if err != nil {
t.Fatal(err)
}
defer tableNoFK.Delete(ctx)
md, err = tableNoFK.Update(ctx, TableMetadataToUpdate{
TableConstraints: &TableConstraints{
ForeignKeys: []*ForeignKey{
{
Name: "table_a_fk",
ReferencedTable: tableA,
ColumnReferences: []*ColumnReference{
{
ReferencedColumn: "id",
ReferencingColumn: "parent",
},
},
},
},
},
}, "")
if err != nil {
t.Fatal(err)
}
if len(md.TableConstraints.ForeignKeys) == 0 || md.TableConstraints.ForeignKeys[0].Name != "table_a_fk" {
t.Fatalf("expected table to contains fk `table_a_fk`, but found %v", md.TableConstraints.ForeignKeys)
}
}