Skip to content

Commit

Permalink
[extension/storage/dbstorage] Create component (#7061)
Browse files Browse the repository at this point in the history
* add dbstorage extension

* lint

* fix import

* make extensions test pass

* goimports

* add CODEOWNERS

* address code review

* code review - add missing link
  • Loading branch information
atoulme committed Jan 12, 2022
1 parent 0bdb885 commit 772f39c
Show file tree
Hide file tree
Showing 17 changed files with 813 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Expand Up @@ -65,6 +65,7 @@ extension/observer/ @open-telemetry/collector-c
extension/observer/ecstaskobserver/ @open-telemetry/collector-contrib-approvers @rmfitzpatrick
extension/observer/k8sobserver/ @open-telemetry/collector-contrib-approvers @rmfitzpatrick @dmitryax
extension/oidcauthextension/ @open-telemetry/collector-contrib-approvers @jpkrohling
extension/storage/dbstorage/ @open-telemetry/collector-contrib-approvers @dmitryax @atoulme
extension/storage/filestorage/ @open-telemetry/collector-contrib-approvers @djaglowski

internal/aws/ @open-telemetry/collector-contrib-approvers @anuraaga @mxiamxia
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -52,6 +52,7 @@
- `ecs_task_observer`: Discover running containers in AWS ECS tasks (#6894)
- `mongodbreceiver`: Establish codebase for MongoDB metrics receiver (#6972)
- `couchbasereceiver`: Establish codebase for Couchbase metrics receiver (#7046)
- `dbstorage`: New experimental dbstorage extension (#7061)

## 🧰 Bug fixes 🧰

Expand Down
9 changes: 9 additions & 0 deletions cmd/configschema/go.mod
Expand Up @@ -150,6 +150,14 @@ require (
github.com/influxdata/influxdb-observability/influx2otel v0.2.10 // indirect
github.com/influxdata/influxdb-observability/otel2influx v0.2.10 // indirect
github.com/influxdata/line-protocol/v2 v2.2.1 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.10.1 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.2.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/pgtype v1.9.1 // indirect
github.com/jackc/pgx/v4 v4.14.1 // indirect
github.com/jaegertracing/jaeger v1.29.0 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
Expand All @@ -173,6 +181,7 @@ require (
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/mattn/go-sqlite3 v1.14.10 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/microsoft/ApplicationInsights-Go v0.4.4 // indirect
github.com/miekg/dns v1.1.43 // indirect
Expand Down
69 changes: 69 additions & 0 deletions cmd/configschema/go.sum

Large diffs are not rendered by default.

38 changes: 38 additions & 0 deletions extension/storage/dbstorage/README.md
@@ -0,0 +1,38 @@
# Database Storage

> :construction: This extension is in alpha. Configuration and functionality are subject to change.
The Database Storage extension can persist state to a relational database.

The extension requires read and write access to a database table.

`driver`: the name of the database driver to use. By default, the storage client supports "sqlite3" and "pgx".

Implementors can add additional driver support by importing SQL drivers into the program.
See [Golang database/sql package documentation](https://pkg.go.dev/database/sql) for more information.

`datasource`: the url of the database, in the format accepted by the driver.


```
extensions:
db_storage:
driver: "sqlite3"
datasource: "foo.db?_busy_timeout=10000&_journal=WAL&_sync=NORMAL"
service:
extensions: [db_storage]
pipelines:
traces:
receivers: [nop]
processors: [nop]
exporters: [nop]
# Data pipeline is required to load the config.
receivers:
nop:
processors:
nop:
exporters:
nop:
```
130 changes: 130 additions & 0 deletions extension/storage/dbstorage/client.go
@@ -0,0 +1,130 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dbstorage // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/dbstorage"

import (
"context"
"database/sql"
"errors"
"fmt"

// Postgres driver
_ "github.com/jackc/pgx/v4/stdlib"
// SQLite driver
_ "github.com/mattn/go-sqlite3"
"go.opentelemetry.io/collector/extension/experimental/storage"
)

const (
createTable = "create table if not exists %s (key text primary key, value blob)"
getQueryText = "select value from %s where key=?"
setQueryText = "insert into %s(key, value) values(?,?) on conflict(key) do update set value=?"
deleteQueryText = "delete from %s where key=?"
)

type dbStorageClient struct {
db *sql.DB
getQuery *sql.Stmt
setQuery *sql.Stmt
deleteQuery *sql.Stmt
}

func newClient(ctx context.Context, db *sql.DB, tableName string) (*dbStorageClient, error) {
var err error
_, err = db.ExecContext(ctx, fmt.Sprintf(createTable, tableName))
if err != nil {
return nil, err
}

selectQuery, err := db.PrepareContext(ctx, fmt.Sprintf(getQueryText, tableName))
if err != nil {
return nil, err
}
setQuery, err := db.PrepareContext(ctx, fmt.Sprintf(setQueryText, tableName))
if err != nil {
return nil, err
}
deleteQuery, err := db.PrepareContext(ctx, fmt.Sprintf(deleteQueryText, tableName))
if err != nil {
return nil, err
}
return &dbStorageClient{db, selectQuery, setQuery, deleteQuery}, nil
}

// Get will retrieve data from storage that corresponds to the specified key
func (c *dbStorageClient) Get(ctx context.Context, key string) ([]byte, error) {
rows, err := c.getQuery.QueryContext(ctx, key)
if err != nil {
return nil, err
}
if !rows.Next() {
return nil, nil
}
var result []byte
err = rows.Scan(&result)
if err != nil {
return result, err
}
err = rows.Close()
return result, err
}

// Set will store data. The data can be retrieved using the same key
func (c *dbStorageClient) Set(ctx context.Context, key string, value []byte) error {
_, err := c.setQuery.ExecContext(ctx, key, value, value)
return err
}

// Delete will delete data associated with the specified key
func (c *dbStorageClient) Delete(ctx context.Context, key string) error {
_, err := c.deleteQuery.ExecContext(ctx, key)
return err
}

// Batch executes the specified operations in order. Get operation results are updated in place
func (c *dbStorageClient) Batch(ctx context.Context, ops ...storage.Operation) error {
var err error
for _, op := range ops {
switch op.Type {
case storage.Get:
op.Value, err = c.Get(ctx, op.Key)
case storage.Set:
err = c.Set(ctx, op.Key, op.Value)
case storage.Delete:
err = c.Delete(ctx, op.Key)
default:
return errors.New("wrong operation type")
}

if err != nil {
return err
}
}
return err
}

// Close will close the database
func (c *dbStorageClient) Close(_ context.Context) error {
if err := c.setQuery.Close(); err != nil {
return err
}
if err := c.deleteQuery.Close(); err != nil {
return err
}
if err := c.getQuery.Close(); err != nil {
return err
}
return nil
}
39 changes: 39 additions & 0 deletions extension/storage/dbstorage/config.go
@@ -0,0 +1,39 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dbstorage // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/dbstorage"

import (
"fmt"

"go.opentelemetry.io/collector/config"
)

// Config defines configuration for dbstorage extension.
type Config struct {
config.ExtensionSettings `mapstructure:",squash"`
DriverName string `mapstructure:"driver,omitempty"`
DataSource string `mapstructure:"datasource,omitempty"`
}

func (cfg *Config) Validate() error {
if cfg.DataSource == "" {
return fmt.Errorf(fmt.Sprintf("missing datasource for %s", cfg.ID()))
}
if cfg.DriverName == "" {
return fmt.Errorf(fmt.Sprintf("missing driver name for %s", cfg.ID()))
}

return nil
}
55 changes: 55 additions & 0 deletions extension/storage/dbstorage/config_test.go
@@ -0,0 +1,55 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dbstorage // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/dbstorage"
import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
)

func TestConfig_Validate(t *testing.T) {
tests := []struct {
name string
config Config
errWanted error
}{
{
"Missing driver name",
Config{DataSource: "foo"},
errors.New("missing driver name for /blah"),
},
{
"Missing datasource",
Config{DriverName: "foo"},
errors.New("missing datasource for /blah"),
},
{
"valid",
Config{DriverName: "foo", DataSource: "bar"},
nil,
},
}

for _, test := range tests {
test.config.SetIDName("blah")
err := test.config.Validate()
if test.errWanted == nil {
assert.NoError(t, err)
} else {
assert.Equal(t, test.errWanted, err)
}
}
}
91 changes: 91 additions & 0 deletions extension/storage/dbstorage/extension.go
@@ -0,0 +1,91 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dbstorage // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/dbstorage"

import (
"context"
"database/sql"
"fmt"
"strings"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/extension/experimental/storage"
"go.uber.org/zap"
)

type databaseStorage struct {
driverName string
datasourceName string
logger *zap.Logger
db *sql.DB
}

// Ensure this storage extension implements the appropriate interface
var _ storage.Extension = (*databaseStorage)(nil)

func newDBStorage(logger *zap.Logger, config *Config) (component.Extension, error) {
return &databaseStorage{
driverName: config.DriverName,
datasourceName: config.DataSource,
logger: logger,
}, nil
}

// Start opens a connection to the database
func (ds *databaseStorage) Start(context.Context, component.Host) error {
db, err := sql.Open(ds.driverName, ds.datasourceName)
if err != nil {
return err
}

if err := db.Ping(); err != nil {
return err
}
ds.db = db
return nil
}

// Shutdown closes the connection to the database
func (ds *databaseStorage) Shutdown(context.Context) error {
return ds.db.Close()
}

// GetClient returns a storage client for an individual component
func (ds *databaseStorage) GetClient(ctx context.Context, kind component.Kind, ent config.ComponentID, name string) (storage.Client, error) {
var fullName string
if name == "" {
fullName = fmt.Sprintf("%s_%s_%s", kindString(kind), ent.Type(), ent.Name())
} else {
fullName = fmt.Sprintf("%s_%s_%s_%s", kindString(kind), ent.Type(), ent.Name(), name)
}
fullName = strings.ReplaceAll(fullName, " ", "")
return newClient(ctx, ds.db, fullName)
}

func kindString(k component.Kind) string {
switch k {
case component.KindReceiver:
return "receiver"
case component.KindProcessor:
return "processor"
case component.KindExporter:
return "exporter"
case component.KindExtension:
return "extension"
default:
return "other" // not expected
}
}

0 comments on commit 772f39c

Please sign in to comment.