Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Local file-based name resolver with SQLite #3178

Merged
merged 21 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 13 additions & 3 deletions internal/authentication/sqlite/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"strings"
"time"

// Blank import for the sqlite driver
_ "modernc.org/sqlite"

"github.com/dapr/kit/logger"
)

Expand All @@ -44,6 +47,7 @@ func (m *SqliteAuthMetadata) Reset() {
m.DisableWAL = false
}

// Validate the auth metadata and returns an error if it's not valid.
func (m *SqliteAuthMetadata) Validate() error {
// Validate and sanitize input
if m.ConnectionString == "" {
Expand All @@ -60,10 +64,16 @@ func (m *SqliteAuthMetadata) Validate() error {
return nil
}

// IsInMemoryDB returns true if the connection string is for an in-memory database.
func (m SqliteAuthMetadata) IsInMemoryDB() bool {
lc := strings.ToLower(m.ConnectionString)
return strings.HasPrefix(lc, ":memory:") || strings.HasPrefix(lc, "file::memory:")
}

// GetConnectionString returns the parsed connection string.
func (m *SqliteAuthMetadata) GetConnectionString(log logger.Logger) (string, error) {
// Check if we're using the in-memory database
lc := strings.ToLower(m.ConnectionString)
isMemoryDB := strings.HasPrefix(lc, ":memory:") || strings.HasPrefix(lc, "file::memory:")
isMemoryDB := m.IsInMemoryDB()

// Get the "query string" from the connection string if present
idx := strings.IndexRune(m.ConnectionString, '?')
Expand Down Expand Up @@ -151,7 +161,7 @@ func (m *SqliteAuthMetadata) GetConnectionString(log logger.Logger) (string, err
connString += "?" + qs.Encode()

// If the connection string doesn't begin with "file:", add the prefix
if !strings.HasPrefix(lc, "file:") {
if !strings.HasPrefix(strings.ToLower(m.ConnectionString), "file:") {
log.Debug("prefix 'file:' added to the connection string")
connString = "file:" + connString
}
Expand Down
2 changes: 1 addition & 1 deletion internal/component/sql/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func Migrate(ctx context.Context, db DatabaseConn, opts MigrationOptions) error
return fmt.Errorf("invalid migration level found in metadata table: %s", migrationLevelStr)
}
}
opts.Logger.Debug("Migrate: current migration level: %d", migrationLevel)
opts.Logger.Debugf("Migrate: current migration level: %d", migrationLevel)

// Perform the migrations
for i := migrationLevel; i < len(opts.Migrations); i++ {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ func (m Migrations) Perform(ctx context.Context, migrationFns []sqlinternal.Migr
defer func() {
m.Logger.Debug("Releasing advisory lock")
queryCtx, cancel = context.WithTimeout(ctx, time.Minute)
_, err = m.DB.Exec(queryCtx, "SELECT pg_advisory_unlock($1)", lockID)
_, rollbackErr := m.DB.Exec(queryCtx, "SELECT pg_advisory_unlock($1)", lockID)
cancel()
if err != nil {
if rollbackErr != nil {
// Panicking here, as this forcibly closes the session and thus ensures we are not leaving locks hanging around
m.Logger.Fatalf("Failed to release advisory lock: %v", err)
m.Logger.Fatalf("Failed to release advisory lock: %v", rollbackErr)
}
}()

Expand Down
8 changes: 4 additions & 4 deletions internal/component/sql/migrations/sqlite/sqlite_migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ func (m *Migrations) Perform(ctx context.Context, migrationFns []sqlinternal.Mig
return
}
queryCtx, cancel = context.WithTimeout(ctx, time.Minute)
_, err = m.conn.ExecContext(queryCtx, "ROLLBACK TRANSACTION")
_, rollbackErr := m.conn.ExecContext(queryCtx, "ROLLBACK TRANSACTION")
cancel()
if err != nil {
// Panicking here, as this forcibly closes the session and thus ensures we are not leaving locks hanging around
m.Logger.Fatalf("Failed to rollback transaction: %v", err)
if rollbackErr != nil {
// Panicking here, as this forcibly closes the session and thus ensures we are not leaving transactions open
m.Logger.Fatalf("Failed to rollback transaction: %v", rollbackErr)
}
}()

Expand Down
25 changes: 15 additions & 10 deletions nameresolution/consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package consul

import (
"context"
"fmt"
"math/rand"
"net"
Expand Down Expand Up @@ -238,7 +239,7 @@ func newResolver(logger logger.Logger, resolverConfig resolverConfig, client cli
}

// Init will configure component. It will also register service or validate client connection based on config.
func (r *resolver) Init(metadata nr.Metadata) (err error) {
func (r *resolver) Init(ctx context.Context, metadata nr.Metadata) (err error) {
r.config, err = getConfig(metadata)
if err != nil {
return err
Expand Down Expand Up @@ -274,7 +275,7 @@ func (r *resolver) Init(metadata nr.Metadata) (err error) {
}

// ResolveID resolves name to address via consul.
func (r *resolver) ResolveID(req nr.ResolveRequest) (addr string, err error) {
func (r *resolver) ResolveID(ctx context.Context, req nr.ResolveRequest) (addr string, err error) {
cfg := r.config
svc, err := r.getService(req.ID)
if err != nil {
Expand Down Expand Up @@ -327,7 +328,8 @@ func formatAddress(address string, port string) (addr string, err error) {

// getConfig configuration from metadata, defaults are best suited for self-hosted mode.
func getConfig(metadata nr.Metadata) (resolverCfg resolverConfig, err error) {
if metadata.Properties[nr.DaprPort] == "" {
props := metadata.GetPropertiesMap()
if props[nr.DaprPort] == "" {
return resolverCfg, fmt.Errorf("metadata property missing: %s", nr.DaprPort)
}

Expand All @@ -341,7 +343,7 @@ func getConfig(metadata nr.Metadata) (resolverCfg resolverConfig, err error) {
resolverCfg.UseCache = cfg.UseCache

resolverCfg.Client = getClientConfig(cfg)
resolverCfg.Registration, err = getRegistrationConfig(cfg, metadata.Properties)
resolverCfg.Registration, err = getRegistrationConfig(cfg, props)
if err != nil {
return resolverCfg, err
}
Expand All @@ -353,7 +355,7 @@ func getConfig(metadata nr.Metadata) (resolverCfg resolverConfig, err error) {
resolverCfg.Registration.Meta = map[string]string{}
}

resolverCfg.Registration.Meta[resolverCfg.DaprPortMetaKey] = metadata.Properties[nr.DaprPort]
resolverCfg.Registration.Meta[resolverCfg.DaprPortMetaKey] = props[nr.DaprPort]
}

return resolverCfg, nil
Expand Down Expand Up @@ -382,22 +384,25 @@ func getRegistrationConfig(cfg configSpec, props map[string]string) (*consul.Age
appPort string
host string
httpPort string
ok bool
)

if appID, ok = props[nr.AppID]; !ok {
appID = props[nr.AppID]
if appID == "" {
return nil, fmt.Errorf("metadata property missing: %s", nr.AppID)
}

if appPort, ok = props[nr.AppPort]; !ok {
appPort = props[nr.AppPort]
if appPort == "" {
return nil, fmt.Errorf("metadata property missing: %s", nr.AppPort)
}

if host, ok = props[nr.HostAddress]; !ok {
host = props[nr.HostAddress]
if host == "" {
return nil, fmt.Errorf("metadata property missing: %s", nr.HostAddress)
}

if httpPort, ok = props[nr.DaprHTTPPort]; !ok {
httpPort = props[nr.DaprHTTPPort]
if httpPort == "" {
return nil, fmt.Errorf("metadata property missing: %s", nr.DaprHTTPPort)
} else if _, err := strconv.ParseUint(httpPort, 10, 0); err != nil {
return nil, fmt.Errorf("error parsing %s: %w", nr.DaprHTTPPort, err)
Expand Down