Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): support for tables primary and foreign keys #8055

Merged
merged 10 commits into from
Jun 21, 2023
131 changes: 131 additions & 0 deletions bigquery/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,99 @@ type TableMetadata struct {
// - '': empty string. Default to case-sensitive behavior.
// More information: https://cloud.google.com/bigquery/docs/reference/standard-sql/collation-concepts
DefaultCollation string

// PrimaryKey constraint on a table's columns.
// Present only if the table has a primary key.
// The primary key is not enforced.
PrimaryKey *PrimaryKey

// ForeignKeys represent a list of foreign keys constraints.
// The foreign key is not enforced.
ForeignKeys []*ForeignKey
}

// PrimaryKey represents the primary key constraint on a table's columns.
type PrimaryKey struct {
// Columns that compose the primary key constraint.
Columns []string
alvarowolfx marked this conversation as resolved.
Show resolved Hide resolved
}

func (pk *PrimaryKey) toBQ() *bq.TableConstraintsPrimaryKey {
return &bq.TableConstraintsPrimaryKey{
Columns: pk.Columns,
}
}

func bqToPrimaryKey(tc *bq.TableConstraints) *PrimaryKey {
if tc.PrimaryKey == nil {
return nil
}
return &PrimaryKey{
Columns: tc.PrimaryKey.Columns,
}
}

// ForeignKey represents a foreign key constraint on a table's columns.
type ForeignKey struct {
// Foreign key constraint name.
Name string

// Table that holds the primary key and is referenced by this foreign key.
ReferencedTable *Table

// Columns that compose the foreign key.
ColumnReferences []*ColumnReference
}

func (fk *ForeignKey) toBQ() *bq.TableConstraintsForeignKeys {
colRefs := []*bq.TableConstraintsForeignKeysColumnReferences{}
for _, colRef := range fk.ColumnReferences {
colRefs = append(colRefs, colRef.toBQ())
}
return &bq.TableConstraintsForeignKeys{
Name: fk.Name,
ReferencedTable: &bq.TableConstraintsForeignKeysReferencedTable{
DatasetId: fk.ReferencedTable.DatasetID,
ProjectId: fk.ReferencedTable.ProjectID,
TableId: fk.ReferencedTable.TableID,
},
ColumnReferences: colRefs,
}
}

func bqToForeignKeys(tc *bq.TableConstraints, c *Client) []*ForeignKey {
fks := []*ForeignKey{}
for _, fk := range tc.ForeignKeys {
colRefs := []*ColumnReference{}
for _, colRef := range fk.ColumnReferences {
colRefs = append(colRefs, &ColumnReference{
ReferencedColumn: colRef.ReferencedColumn,
ReferencingColumn: colRef.ReferencingColumn,
})
}
fks = append(fks, &ForeignKey{
Name: fk.Name,
ReferencedTable: c.DatasetInProject(fk.ReferencedTable.DatasetId, fk.ReferencedTable.ProjectId).Table(fk.ReferencedTable.TableId),
ColumnReferences: colRefs,
})
}
return fks
}

// ColumnReference represents the pair of the foreign key column and primary key column.
type ColumnReference struct {
// ReferencingColumn is the column in the current table that composes the foreign key.
ReferencingColumn string
// ReferencedColumn is the column in the primary key of the foreign table that
// is referenced by the ReferencingColumn.
ReferencedColumn string
}

func (colRef *ColumnReference) toBQ() *bq.TableConstraintsForeignKeysColumnReferences {
return &bq.TableConstraintsForeignKeysColumnReferences{
ReferencedColumn: colRef.ReferencedColumn,
ReferencingColumn: colRef.ReferencingColumn,
}
}

// TableCreateDisposition specifies the circumstances under which destination table will be created.
Expand Down Expand Up @@ -675,6 +768,19 @@ func (tm *TableMetadata) toBQ() (*bq.Table, error) {
return nil, errors.New("cannot set ETag on create")
}
t.DefaultCollation = string(tm.DefaultCollation)

if tm.PrimaryKey != nil || len(tm.ForeignKeys) > 0 {
t.TableConstraints = &bq.TableConstraints{}
if tm.PrimaryKey != nil {
t.TableConstraints.PrimaryKey = tm.PrimaryKey.toBQ()
}
if len(tm.ForeignKeys) > 0 {
t.TableConstraints.ForeignKeys = make([]*bq.TableConstraintsForeignKeys, len(tm.ForeignKeys))
for i, fk := range tm.ForeignKeys {
t.TableConstraints.ForeignKeys[i] = fk.toBQ()
}
}
}
return t, nil
}

Expand Down Expand Up @@ -788,6 +894,10 @@ func bqToTableMetadata(t *bq.Table, c *Client) (*TableMetadata, error) {
}
md.ExternalDataConfig = edc
}
if t.TableConstraints != nil {
md.PrimaryKey = bqToPrimaryKey(t.TableConstraints)
md.ForeignKeys = bqToForeignKeys(t.TableConstraints, c)
}
return md, nil
}

Expand Down Expand Up @@ -947,6 +1057,18 @@ func (tm *TableMetadataToUpdate) toBQ() (*bq.Table, error) {
t.DefaultCollation = optional.ToString(tm.DefaultCollation)
forceSend("DefaultCollation")
}
if tm.PrimaryKey != nil || len(tm.ForeignKeys) > 0 {
alvarowolfx marked this conversation as resolved.
Show resolved Hide resolved
t.TableConstraints = &bq.TableConstraints{}
if tm.PrimaryKey != nil {
t.TableConstraints.PrimaryKey = tm.PrimaryKey.toBQ()
}
if len(tm.ForeignKeys) > 0 {
t.TableConstraints.ForeignKeys = make([]*bq.TableConstraintsForeignKeys, len(tm.ForeignKeys))
for i, fk := range tm.ForeignKeys {
t.TableConstraints.ForeignKeys[i] = fk.toBQ()
}
}
}
labels, forces, nulls := tm.update()
t.Labels = labels
t.ForceSendFields = append(t.ForceSendFields, forces...)
Expand Down Expand Up @@ -1024,6 +1146,15 @@ type TableMetadataToUpdate struct {
// in the table.
DefaultCollation optional.String

// Optionally specifies a PrimaryKey constraint on a table's columns.
// Updated only if present.
// The primary key is not enforced.
PrimaryKey *PrimaryKey

// ForeignKeys represent a list of foreign keys constraints.
// The foreign key is not enforced.
alvarowolfx marked this conversation as resolved.
Show resolved Hide resolved
ForeignKeys []*ForeignKey

labelUpdater
}

Expand Down
150 changes: 150 additions & 0 deletions bigquery/table_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,3 +661,153 @@ func TestIntegration_TableDefaultCollation(t *testing.T) {
}
}
}

func TestIntegration_TableConstraintsPK(t *testing.T) {
// Test Primary Keys for Table.Create and Table.Update
if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()
table := dataset.Table(tableIDs.New())
err := table.Create(context.Background(), &TableMetadata{
Schema: schema,
PrimaryKey: &PrimaryKey{
Columns: []string{"name"},
},
ExpirationTime: testTableExpiration,
})
if err != nil {
t.Fatal(err)
}
defer table.Delete(ctx)
md, err := table.Metadata(ctx)
if err != nil {
t.Fatal(err)
}
if md.PrimaryKey.Columns[0] != "name" {
t.Fatalf("expected table primary key to contain column `name`, but found %q", md.PrimaryKey.Columns)
}

tableNoPK := dataset.Table(tableIDs.New())
err = tableNoPK.Create(context.Background(), &TableMetadata{
Schema: schema,
ExpirationTime: testTableExpiration,
})
if err != nil {
t.Fatal(err)
}
defer tableNoPK.Delete(ctx)
md, err = tableNoPK.Metadata(ctx)
if err != nil {
t.Fatal(err)
}
if md.PrimaryKey != nil {
t.Fatalf("expected table to not have a PK, but found %v", md.PrimaryKey.Columns)
}

md, err = tableNoPK.Update(ctx, TableMetadataToUpdate{
PrimaryKey: &PrimaryKey{
Columns: []string{"name"},
},
}, "")
if err != nil {
t.Fatal(err)
}
if md.PrimaryKey == nil || md.PrimaryKey.Columns[0] != "name" {
t.Fatalf("expected table primary key to contain column `name`, but found %v", md.PrimaryKey)
}
}

func TestIntegration_TableConstraintsFK(t *testing.T) {
// Test Foreign keys for Table.Create and Table.Update
if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()
tableA := dataset.Table(tableIDs.New())
schemaA := []*FieldSchema{
{Name: "id", Type: IntegerFieldType},
{Name: "name", Type: StringFieldType},
}
err := tableA.Create(context.Background(), &TableMetadata{
Schema: schemaA,
PrimaryKey: &PrimaryKey{
Columns: []string{"id"},
},
ExpirationTime: testTableExpiration,
})
if err != nil {
t.Fatal(err)
}
defer tableA.Delete(ctx)

tableB := dataset.Table(tableIDs.New())
schemaB := []*FieldSchema{
{Name: "id", Type: IntegerFieldType},
{Name: "name", Type: StringFieldType},
{Name: "parent", Type: IntegerFieldType},
}
err = tableB.Create(context.Background(), &TableMetadata{
Schema: schemaB,
PrimaryKey: &PrimaryKey{
Columns: []string{"id"},
},
ForeignKeys: []*ForeignKey{
{
Name: "table_a_fk",
ReferencedTable: tableA,
ColumnReferences: []*ColumnReference{
{
ReferencingColumn: "parent",
ReferencedColumn: "id",
},
},
},
},
ExpirationTime: testTableExpiration,
})
if err != nil {
t.Fatal(err)
}
defer tableB.Delete(ctx)
md, err := tableB.Metadata(ctx)
if err != nil {
t.Fatal(err)
}
if len(md.ForeignKeys) == 0 || md.ForeignKeys[0].Name != "table_a_fk" {
t.Fatalf("expected table to contains fk `self`, but found %v", md.ForeignKeys)
}

tableNoFK := dataset.Table(tableIDs.New())
err = tableNoFK.Create(context.Background(), &TableMetadata{
Schema: schemaB,
PrimaryKey: &PrimaryKey{
Columns: []string{"id"},
},
ExpirationTime: testTableExpiration,
})
if err != nil {
t.Fatal(err)
}
defer tableNoFK.Delete(ctx)
md, err = tableNoFK.Update(ctx, TableMetadataToUpdate{
ForeignKeys: []*ForeignKey{
{
Name: "table_a_fk",
ReferencedTable: tableA,
ColumnReferences: []*ColumnReference{
{
ReferencedColumn: "id",
ReferencingColumn: "parent",
},
},
},
},
}, "")
if err != nil {
t.Fatal(err)
}
if len(md.ForeignKeys) == 0 || md.ForeignKeys[0].Name != "table_a_fk" {
t.Fatalf("expected table to contains fk `self`, but found %v", md.ForeignKeys)
}
}
59 changes: 59 additions & 0 deletions bigquery/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ func TestBQToTableMetadata(t *testing.T) {
ExternalDataConfiguration: &bq.ExternalDataConfiguration{
SourceFormat: "GOOGLE_SHEETS",
},
TableConstraints: &bq.TableConstraints{
PrimaryKey: &bq.TableConstraintsPrimaryKey{
Columns: []string{"id"},
},
},
},
&TableMetadata{
Description: "desc",
Expand Down Expand Up @@ -110,6 +115,10 @@ func TestBQToTableMetadata(t *testing.T) {
},
EncryptionConfig: &EncryptionConfig{KMSKeyName: "keyName"},
ETag: "etag",
PrimaryKey: &PrimaryKey{
Columns: []string{"id"},
},
ForeignKeys: []*ForeignKey{},
},
},
} {
Expand Down Expand Up @@ -405,6 +414,56 @@ func TestTableMetadataToUpdateToBQ(t *testing.T) {
Clustering: &bq.Clustering{Fields: []string{"foo", "bar"}},
},
},
{
tm: TableMetadataToUpdate{PrimaryKey: &PrimaryKey{Columns: []string{"name"}}},
want: &bq.Table{
TableConstraints: &bq.TableConstraints{
PrimaryKey: &bq.TableConstraintsPrimaryKey{
Columns: []string{"name"},
},
},
},
},
{
tm: TableMetadataToUpdate{
ForeignKeys: []*ForeignKey{
{
Name: "fk",
ReferencedTable: &Table{
ProjectID: "projectID",
DatasetID: "datasetID",
TableID: "tableID",
},
ColumnReferences: []*ColumnReference{
{
ReferencedColumn: "id",
ReferencingColumn: "other_table_id",
},
},
},
},
},
want: &bq.Table{
TableConstraints: &bq.TableConstraints{
ForeignKeys: []*bq.TableConstraintsForeignKeys{
{
Name: "fk",
ReferencedTable: &bq.TableConstraintsForeignKeysReferencedTable{
ProjectId: "projectID",
DatasetId: "datasetID",
TableId: "tableID",
},
ColumnReferences: []*bq.TableConstraintsForeignKeysColumnReferences{
{
ReferencedColumn: "id",
ReferencingColumn: "other_table_id",
},
},
},
},
},
},
},
} {
got, _ := test.tm.toBQ()
if !testutil.Equal(got, test.want) {
Expand Down