diff --git a/bigquery/integration_test.go b/bigquery/integration_test.go index 99d78853211..f2034978fb5 100644 --- a/bigquery/integration_test.go +++ b/bigquery/integration_test.go @@ -29,6 +29,7 @@ import ( "testing" "time" + connection "cloud.google.com/go/bigquery/connection/apiv1" "cloud.google.com/go/civil" datacatalog "cloud.google.com/go/datacatalog/apiv1" "cloud.google.com/go/httpreplay" @@ -54,6 +55,7 @@ var record = flag.Bool("record", false, "record RPCs") var ( client *Client storageClient *storage.Client + connectionsClient *connection.Client policyTagManagerClient *datacatalog.PolicyTagManagerClient dataset *Dataset otherDataset *Dataset @@ -123,6 +125,10 @@ func initIntegrationTest() func() { if err != nil { log.Fatal(err) } + connectionsClient, err = connection.NewClient(ctx, option.WithHTTPClient(hc)) + if err != nil { + log.Fatal(err) + } policyTagManagerClient, err = datacatalog.NewPolicyTagManagerClient(ctx) if err != nil { log.Fatal(err) @@ -140,6 +146,7 @@ func initIntegrationTest() func() { } client = nil storageClient = nil + connectionsClient = nil return func() {} default: // Run integration tests against a real backend. @@ -203,6 +210,10 @@ func initIntegrationTest() func() { if err != nil { log.Fatalf("datacatalog.NewPolicyTagManagerClient: %v", err) } + connectionsClient, err = connection.NewClient(ctx, sOpts...) + if err != nil { + log.Fatalf("connection.NewService: %v", err) + } c := initTestState(client, now) return func() { c(); cleanup() } } diff --git a/bigquery/routine.go b/bigquery/routine.go index 2185a146897..2274f33fe36 100644 --- a/bigquery/routine.go +++ b/bigquery/routine.go @@ -163,6 +163,15 @@ const ( NotDeterministic RoutineDeterminism = "NOT_DETERMINISTIC" ) +const ( + // ScalarFunctionRoutine scalar function routine type + ScalarFunctionRoutine = "SCALAR_FUNCTION" + // ProcedureRoutine procedure routine type + ProcedureRoutine = "PROCEDURE" + // TableValuedFunctionRoutine routine type for table valued functions + TableValuedFunctionRoutine = "TABLE_VALUED_FUNCTION" +) + // RoutineMetadata represents details of a given BigQuery Routine. type RoutineMetadata struct { ETag string @@ -177,7 +186,11 @@ type RoutineMetadata struct { // Language of the routine, such as SQL or JAVASCRIPT. Language string // The list of arguments for the the routine. - Arguments []*RoutineArgument + Arguments []*RoutineArgument + + // Information for a remote user-defined function. + RemoteFunctionOptions *RemoteFunctionOptions + ReturnType *StandardSQLDataType // Set only if the routine type is TABLE_VALUED_FUNCTION. @@ -195,6 +208,66 @@ type RoutineMetadata struct { Body string } +// RemoteFunctionOptions contains information for a remote user-defined function. +type RemoteFunctionOptions struct { + + // Fully qualified name of the user-provided connection object which holds + // the authentication information to send requests to the remote service. + // Format: + // projects/{projectId}/locations/{locationId}/connections/{connectionId} + Connection string + + // Endpoint of the user-provided remote service (e.g. a function url in + // Google Cloud Function or Cloud Run ) + Endpoint string + + // Max number of rows in each batch sent to the remote service. + // If absent or if 0, it means no limit. + MaxBatchingRows int64 + + // User-defined context as a set of key/value pairs, + // which will be sent as function invocation context together with + // batched arguments in the requests to the remote service. The total + // number of bytes of keys and values must be less than 8KB. + UserDefinedContext map[string]string +} + +func bqToRemoteFunctionOptions(in *bq.RemoteFunctionOptions) (*RemoteFunctionOptions, error) { + if in == nil { + return nil, nil + } + rfo := &RemoteFunctionOptions{ + Connection: in.Connection, + Endpoint: in.Endpoint, + MaxBatchingRows: in.MaxBatchingRows, + } + if in.UserDefinedContext != nil { + rfo.UserDefinedContext = make(map[string]string) + for k, v := range in.UserDefinedContext { + rfo.UserDefinedContext[k] = v + } + } + return rfo, nil +} + +func (rfo *RemoteFunctionOptions) toBQ() (*bq.RemoteFunctionOptions, error) { + if rfo == nil { + return nil, nil + } + r := &bq.RemoteFunctionOptions{ + Connection: rfo.Connection, + Endpoint: rfo.Endpoint, + MaxBatchingRows: rfo.MaxBatchingRows, + } + if rfo.UserDefinedContext != nil { + r.UserDefinedContext = make(map[string]string) + for k, v := range rfo.UserDefinedContext { + r.UserDefinedContext[k] = v + } + } + return r, nil +} + func (rm *RoutineMetadata) toBQ() (*bq.Routine, error) { r := &bq.Routine{} if rm == nil { @@ -227,6 +300,13 @@ func (rm *RoutineMetadata) toBQ() (*bq.Routine, error) { } r.Arguments = args r.ImportedLibraries = rm.ImportedLibraries + if rm.RemoteFunctionOptions != nil { + rfo, err := rm.RemoteFunctionOptions.toBQ() + if err != nil { + return nil, err + } + r.RemoteFunctionOptions = rfo + } if !rm.CreationTime.IsZero() { return nil, errors.New("cannot set CreationTime on create") } @@ -436,6 +516,11 @@ func bqToRoutineMetadata(r *bq.Routine) (*RoutineMetadata, error) { return nil, err } meta.ReturnType = ret + rfo, err := bqToRemoteFunctionOptions(r.RemoteFunctionOptions) + if err != nil { + return nil, err + } + meta.RemoteFunctionOptions = rfo tt, err := bqToStandardSQLTableType(r.ReturnTableType) if err != nil { return nil, err diff --git a/bigquery/routine_integration_test.go b/bigquery/routine_integration_test.go index 1a8a51b0dcf..0208ec32a66 100644 --- a/bigquery/routine_integration_test.go +++ b/bigquery/routine_integration_test.go @@ -21,6 +21,7 @@ import ( "cloud.google.com/go/internal/testutil" "google.golang.org/api/iterator" + "google.golang.org/genproto/googleapis/cloud/bigquery/connection/v1" ) func TestIntegration_RoutineScalarUDF(t *testing.T) { @@ -88,6 +89,85 @@ func TestIntegration_RoutineJSUDF(t *testing.T) { } } +func TestIntegration_RoutineRemoteUDF(t *testing.T) { + if client == nil { + t.Skip("Integration tests skipped") + } + ctx := context.Background() + + routineID := routineIDs.New() + routine := dataset.Routine(routineID) + uri := "https://aaabbbccc-uc.a.run.app" + + connectionLocation := fmt.Sprintf("projects/%s/locations/%s", dataset.ProjectID, "us") + connectionName := fmt.Sprintf("udf_conn%s", routineID) + cleanupConnection, connectionID, err := createConnection(ctx, t, connectionLocation, connectionName) + if err != nil { + t.Fatal(err) + } + defer cleanupConnection() + + remoteOpts := &RemoteFunctionOptions{ + Endpoint: uri, + Connection: connectionID, + MaxBatchingRows: 50, + UserDefinedContext: map[string]string{"foo": "bar"}, + } + meta := &RoutineMetadata{ + RemoteFunctionOptions: remoteOpts, + Description: "defines a remote function", + Type: ScalarFunctionRoutine, + ReturnType: &StandardSQLDataType{ + TypeKind: "STRING", + }, + } + if err := routine.Create(ctx, meta); err != nil { + t.Fatalf("routine.Create: %v", err) + } + + gotMeta, err := routine.Metadata(ctx) + if err != nil { + t.Fatalf("routine.Metadata: %v", err) + } + + if diff := testutil.Diff(gotMeta.RemoteFunctionOptions, remoteOpts); diff != "" { + t.Fatalf("RemoteFunctionOptions: -got, +want:\n%s", diff) + } +} + +func createConnection(ctx context.Context, t *testing.T, parent, name string) (cleanup func(), connectionID string, err error) { + fullname := fmt.Sprintf("%s/connections/%s", parent, name) + conn, err := connectionsClient.CreateConnection(ctx, &connection.CreateConnectionRequest{ + Parent: parent, + ConnectionId: name, + Connection: &connection.Connection{ + FriendlyName: name, + Properties: &connection.Connection_CloudResource{ + CloudResource: &connection.CloudResourceProperties{}, + }, + }, + }) + if err != nil { + return + } + conn, err = connectionsClient.GetConnection(ctx, &connection.GetConnectionRequest{ + Name: fullname, + }) + if err != nil { + return + } + cleanup = func() { + err := connectionsClient.DeleteConnection(ctx, &connection.DeleteConnectionRequest{ + Name: fullname, + }) + if err != nil { + t.Logf("could not delete connection: %s", fullname) + } + } + connectionID = conn.Name + return +} + func TestIntegration_RoutineComplexTypes(t *testing.T) { if client == nil { t.Skip("Integration tests skipped")