Skip to content

Commit

Permalink
Local file-based name resolver with SQLite (#3178)
Browse files Browse the repository at this point in the history
Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
Signed-off-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
Co-authored-by: Josh van Leeuwen <me@joshvanl.dev>
Co-authored-by: Bernd Verst <github@bernd.dev>
  • Loading branch information
3 people committed Nov 1, 2023
1 parent ef68cb6 commit 8680e27
Show file tree
Hide file tree
Showing 19 changed files with 974 additions and 296 deletions.
16 changes: 13 additions & 3 deletions internal/authentication/sqlite/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"strings"
"time"

// Blank import for the sqlite driver
_ "modernc.org/sqlite"

"github.com/dapr/kit/logger"
)

Expand All @@ -44,6 +47,7 @@ func (m *SqliteAuthMetadata) Reset() {
m.DisableWAL = false
}

// Validate the auth metadata and returns an error if it's not valid.
func (m *SqliteAuthMetadata) Validate() error {
// Validate and sanitize input
if m.ConnectionString == "" {
Expand All @@ -60,10 +64,16 @@ func (m *SqliteAuthMetadata) Validate() error {
return nil
}

// IsInMemoryDB returns true if the connection string is for an in-memory database.
func (m SqliteAuthMetadata) IsInMemoryDB() bool {
lc := strings.ToLower(m.ConnectionString)
return strings.HasPrefix(lc, ":memory:") || strings.HasPrefix(lc, "file::memory:")
}

// GetConnectionString returns the parsed connection string.
func (m *SqliteAuthMetadata) GetConnectionString(log logger.Logger) (string, error) {
// Check if we're using the in-memory database
lc := strings.ToLower(m.ConnectionString)
isMemoryDB := strings.HasPrefix(lc, ":memory:") || strings.HasPrefix(lc, "file::memory:")
isMemoryDB := m.IsInMemoryDB()

// Get the "query string" from the connection string if present
idx := strings.IndexRune(m.ConnectionString, '?')
Expand Down Expand Up @@ -151,7 +161,7 @@ func (m *SqliteAuthMetadata) GetConnectionString(log logger.Logger) (string, err
connString += "?" + qs.Encode()

// If the connection string doesn't begin with "file:", add the prefix
if !strings.HasPrefix(lc, "file:") {
if !strings.HasPrefix(strings.ToLower(m.ConnectionString), "file:") {
log.Debug("prefix 'file:' added to the connection string")
connString = "file:" + connString
}
Expand Down
2 changes: 1 addition & 1 deletion internal/component/sql/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func Migrate(ctx context.Context, db DatabaseConn, opts MigrationOptions) error
return fmt.Errorf("invalid migration level found in metadata table: %s", migrationLevelStr)
}
}
opts.Logger.Debug("Migrate: current migration level: %d", migrationLevel)
opts.Logger.Debugf("Migrate: current migration level: %d", migrationLevel)

// Perform the migrations
for i := migrationLevel; i < len(opts.Migrations); i++ {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ func (m Migrations) Perform(ctx context.Context, migrationFns []sqlinternal.Migr
defer func() {
m.Logger.Debug("Releasing advisory lock")
queryCtx, cancel = context.WithTimeout(ctx, time.Minute)
_, err = m.DB.Exec(queryCtx, "SELECT pg_advisory_unlock($1)", lockID)
_, rollbackErr := m.DB.Exec(queryCtx, "SELECT pg_advisory_unlock($1)", lockID)
cancel()
if err != nil {
if rollbackErr != nil {
// Panicking here, as this forcibly closes the session and thus ensures we are not leaving locks hanging around
m.Logger.Fatalf("Failed to release advisory lock: %v", err)
m.Logger.Fatalf("Failed to release advisory lock: %v", rollbackErr)
}
}()

Expand Down
8 changes: 4 additions & 4 deletions internal/component/sql/migrations/sqlite/sqlite_migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ func (m *Migrations) Perform(ctx context.Context, migrationFns []sqlinternal.Mig
return
}
queryCtx, cancel = context.WithTimeout(ctx, time.Minute)
_, err = m.conn.ExecContext(queryCtx, "ROLLBACK TRANSACTION")
_, rollbackErr := m.conn.ExecContext(queryCtx, "ROLLBACK TRANSACTION")
cancel()
if err != nil {
// Panicking here, as this forcibly closes the session and thus ensures we are not leaving locks hanging around
m.Logger.Fatalf("Failed to rollback transaction: %v", err)
if rollbackErr != nil {
// Panicking here, as this forcibly closes the session and thus ensures we are not leaving transactions open
m.Logger.Fatalf("Failed to rollback transaction: %v", rollbackErr)
}
}()

Expand Down
25 changes: 15 additions & 10 deletions nameresolution/consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package consul

import (
"context"
"fmt"
"math/rand"
"net"
Expand Down Expand Up @@ -238,7 +239,7 @@ func newResolver(logger logger.Logger, resolverConfig resolverConfig, client cli
}

// Init will configure component. It will also register service or validate client connection based on config.
func (r *resolver) Init(metadata nr.Metadata) (err error) {
func (r *resolver) Init(ctx context.Context, metadata nr.Metadata) (err error) {
r.config, err = getConfig(metadata)
if err != nil {
return err
Expand Down Expand Up @@ -274,7 +275,7 @@ func (r *resolver) Init(metadata nr.Metadata) (err error) {
}

// ResolveID resolves name to address via consul.
func (r *resolver) ResolveID(req nr.ResolveRequest) (addr string, err error) {
func (r *resolver) ResolveID(ctx context.Context, req nr.ResolveRequest) (addr string, err error) {
cfg := r.config
svc, err := r.getService(req.ID)
if err != nil {
Expand Down Expand Up @@ -327,7 +328,8 @@ func formatAddress(address string, port string) (addr string, err error) {

// getConfig configuration from metadata, defaults are best suited for self-hosted mode.
func getConfig(metadata nr.Metadata) (resolverCfg resolverConfig, err error) {
if metadata.Properties[nr.DaprPort] == "" {
props := metadata.GetPropertiesMap()
if props[nr.DaprPort] == "" {
return resolverCfg, fmt.Errorf("metadata property missing: %s", nr.DaprPort)
}

Expand All @@ -341,7 +343,7 @@ func getConfig(metadata nr.Metadata) (resolverCfg resolverConfig, err error) {
resolverCfg.UseCache = cfg.UseCache

resolverCfg.Client = getClientConfig(cfg)
resolverCfg.Registration, err = getRegistrationConfig(cfg, metadata.Properties)
resolverCfg.Registration, err = getRegistrationConfig(cfg, props)
if err != nil {
return resolverCfg, err
}
Expand All @@ -353,7 +355,7 @@ func getConfig(metadata nr.Metadata) (resolverCfg resolverConfig, err error) {
resolverCfg.Registration.Meta = map[string]string{}
}

resolverCfg.Registration.Meta[resolverCfg.DaprPortMetaKey] = metadata.Properties[nr.DaprPort]
resolverCfg.Registration.Meta[resolverCfg.DaprPortMetaKey] = props[nr.DaprPort]
}

return resolverCfg, nil
Expand Down Expand Up @@ -382,22 +384,25 @@ func getRegistrationConfig(cfg configSpec, props map[string]string) (*consul.Age
appPort string
host string
httpPort string
ok bool
)

if appID, ok = props[nr.AppID]; !ok {
appID = props[nr.AppID]
if appID == "" {
return nil, fmt.Errorf("metadata property missing: %s", nr.AppID)
}

if appPort, ok = props[nr.AppPort]; !ok {
appPort = props[nr.AppPort]
if appPort == "" {
return nil, fmt.Errorf("metadata property missing: %s", nr.AppPort)
}

if host, ok = props[nr.HostAddress]; !ok {
host = props[nr.HostAddress]
if host == "" {
return nil, fmt.Errorf("metadata property missing: %s", nr.HostAddress)
}

if httpPort, ok = props[nr.DaprHTTPPort]; !ok {
httpPort = props[nr.DaprHTTPPort]
if httpPort == "" {
return nil, fmt.Errorf("metadata property missing: %s", nr.DaprHTTPPort)
} else if _, err := strconv.ParseUint(httpPort, 10, 0); err != nil {
return nil, fmt.Errorf("error parsing %s: %w", nr.DaprHTTPPort, err)
Expand Down

0 comments on commit 8680e27

Please sign in to comment.