Skip to content

Commit

Permalink
Isolation and code reuse of components (#157)
Browse files Browse the repository at this point in the history
  • Loading branch information
seeflood authored Jul 30, 2021
1 parent 3802c45 commit dbc468f
Show file tree
Hide file tree
Showing 29 changed files with 767 additions and 432 deletions.
149 changes: 5 additions & 144 deletions components/lock/etcd/etcd_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,17 @@ package etcd

import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io/ioutil"
"strconv"
"strings"
"time"

"go.etcd.io/etcd/client/v3"
"mosn.io/layotto/components/pkg/utils"

"mosn.io/layotto/components/lock"
"mosn.io/pkg/log"
)

const (
defaultDialTimeout = 5
defaultKeyPrefix = "/layotto/"

prefixKey = "keyPrefixPath"
usernameKey = "username"
passwordKey = "password"
dialTimeoutKey = "dialTimeout"
endpointsKey = "endpoints"
tlsCertPathKey = "tlsCert"
tlsCertKeyPathKey = "tlsCertKey"
tlsCaPathKey = "tlsCa"
)

type EtcdLock struct {
client *clientv3.Client
metadata metadata
metadata utils.EtcdMetadata

features []lock.Feature
logger log.ErrorLogger
Expand All @@ -54,13 +33,13 @@ func NewEtcdLock(logger log.ErrorLogger) *EtcdLock {

func (e *EtcdLock) Init(metadata lock.Metadata) error {
// 1. parse config
m, err := parseEtcdMetadata(metadata)
m, err := utils.ParseEtcdMetadata(metadata.Properties)
if err != nil {
return err
}
e.metadata = m
// 2. construct client
if e.client, err = e.newClient(m); err != nil {
if e.client, err = utils.NewEtcdClient(m); err != nil {
return err
}

Expand Down Expand Up @@ -134,130 +113,12 @@ func (e *EtcdLock) Close() error {
return e.client.Close()
}

func (e *EtcdLock) newClient(meta metadata) (*clientv3.Client, error) {

config := clientv3.Config{
Endpoints: meta.endpoints,
DialTimeout: time.Second * time.Duration(meta.dialTimeout),
Username: meta.username,
Password: meta.password,
}

if meta.tlsCa != "" || meta.tlsCert != "" || meta.tlsCertKey != "" {
//enable tls
cert, err := tls.LoadX509KeyPair(meta.tlsCert, meta.tlsCertKey)
if err != nil {
return nil, fmt.Errorf("error reading tls certificate, cert: %s, certKey: %s, err: %s", meta.tlsCert, meta.tlsCertKey, err)
}

caData, err := ioutil.ReadFile(meta.tlsCa)
if err != nil {
return nil, fmt.Errorf("error reading tls ca %s, err: %s", meta.tlsCa, err)
}

pool := x509.NewCertPool()
pool.AppendCertsFromPEM(caData)

tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: pool,
}
config.TLS = tlsConfig
}

if client, err := clientv3.New(config); err != nil {
return nil, err
} else {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(meta.dialTimeout))
defer cancel()
//ping
_, err = client.Get(ctx, "ping")
if err != nil {
return nil, fmt.Errorf("etcd lock error: connect to etcd timeoout %s", meta.endpoints)
}

return client, nil
}
}

func (e *EtcdLock) getKey(resourceId string) string {
return fmt.Sprintf("%s%s", e.metadata.keyPrefix, resourceId)
return fmt.Sprintf("%s%s", e.metadata.KeyPrefix, resourceId)
}

func newInternalErrorUnlockResponse() *lock.UnlockResponse {
return &lock.UnlockResponse{
Status: lock.INTERNAL_ERROR,
}
}

func parseEtcdMetadata(meta lock.Metadata) (metadata, error) {
m := metadata{}
var err error

if val, ok := meta.Properties[endpointsKey]; ok && val != "" {
m.endpoints = strings.Split(val, ";")
} else {
return m, errors.New("etcd lock error: missing endpoints address")
}

if val, ok := meta.Properties[dialTimeoutKey]; ok && val != "" {
if m.dialTimeout, err = strconv.Atoi(val); err != nil {
return m, fmt.Errorf("etcd lock error: ncorrect dialTimeout value %s", val)
}
} else {
m.dialTimeout = defaultDialTimeout
}

if val, ok := meta.Properties[prefixKey]; ok && val != "" {
m.keyPrefix = addPathSeparator(val)
} else {
m.keyPrefix = defaultKeyPrefix
}

if val, ok := meta.Properties[usernameKey]; ok && val != "" {
m.username = val
}

if val, ok := meta.Properties[passwordKey]; ok && val != "" {
m.password = val
}

if val, ok := meta.Properties[tlsCaPathKey]; ok && val != "" {
m.tlsCa = val
}

if val, ok := meta.Properties[tlsCertPathKey]; ok && val != "" {
m.tlsCert = val
}

if val, ok := meta.Properties[tlsCertKeyPathKey]; ok && val != "" {
m.tlsCertKey = val
}

return m, nil
}

func addPathSeparator(p string) string {
if p == "" {
return "/"
}
if p[0] != '/' {
p = "/" + p
}
if p[len(p)-1] != '/' {
p = p + "/"
}
return p
}

type metadata struct {
keyPrefix string
dialTimeout int
endpoints []string
username string
password string

tlsCa string
tlsCert string
tlsCertKey string
}
93 changes: 5 additions & 88 deletions components/lock/redis/standalone_redis_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,18 @@ package redis

import (
"context"
"crypto/tls"
"errors"
"fmt"
"github.com/go-redis/redis/v8"
"mosn.io/layotto/components/lock"
"mosn.io/layotto/components/pkg/utils"
"mosn.io/pkg/log"
"strconv"
"time"
)

const (
host = "redisHost"
password = "redisPassword"
enableTLS = "enableTLS"
maxRetries = "maxRetries"
maxRetryBackoff = "maxRetryBackoff"
defaultBase = 10
defaultBitSize = 0
defaultDB = 0
defaultMaxRetries = 3
defaultMaxRetryBackoff = time.Second * 2
defaultEnableTLS = false
)

// Standalone Redis lock store.Any fail-over related features are not supported,such as Sentinel and Redis Cluster.
type StandaloneRedisLock struct {
client *redis.Client
metadata metadata
metadata utils.RedisMetadata
replicas int

features []lock.Feature
Expand All @@ -51,37 +35,21 @@ func NewStandaloneRedisLock(logger log.ErrorLogger) *StandaloneRedisLock {

func (p *StandaloneRedisLock) Init(metadata lock.Metadata) error {
// 1. parse config
m, err := parseRedisMetadata(metadata)
m, err := utils.ParseRedisMetadata(metadata.Properties)
if err != nil {
return err
}
p.metadata = m
// 2. construct client
p.client = p.newClient(m)
p.client = utils.NewRedisClient(m)
p.ctx, p.cancel = context.WithCancel(context.Background())
// 3. connect to redis
if _, err = p.client.Ping(p.ctx).Result(); err != nil {
return fmt.Errorf("[standaloneRedisLock]: error connecting to redis at %s: %s", m.host, err)
return fmt.Errorf("[standaloneRedisLock]: error connecting to redis at %s: %s", m.Host, err)
}
return err
}

func (p *StandaloneRedisLock) newClient(m metadata) *redis.Client {
opts := &redis.Options{
Addr: m.host,
Password: m.password,
DB: defaultDB,
MaxRetries: m.maxRetries,
MaxRetryBackoff: m.maxRetryBackoff,
}
if m.enableTLS {
opts.TLSConfig = &tls.Config{
InsecureSkipVerify: m.enableTLS,
}
}
return redis.NewClient(opts)
}

func (p *StandaloneRedisLock) Features() []lock.Feature {
return p.features
}
Expand Down Expand Up @@ -145,54 +113,3 @@ func (p *StandaloneRedisLock) Close() error {

return p.client.Close()
}

func parseRedisMetadata(meta lock.Metadata) (metadata, error) {
m := metadata{}

if val, ok := meta.Properties[host]; ok && val != "" {
m.host = val
} else {
return m, errors.New("redis store error: missing host address")
}

if val, ok := meta.Properties[password]; ok && val != "" {
m.password = val
}

m.enableTLS = defaultEnableTLS
if val, ok := meta.Properties[enableTLS]; ok && val != "" {
tls, err := strconv.ParseBool(val)
if err != nil {
return m, fmt.Errorf("redis store error: can't parse enableTLS field: %s", err)
}
m.enableTLS = tls
}

m.maxRetries = defaultMaxRetries
if val, ok := meta.Properties[maxRetries]; ok && val != "" {
parsedVal, err := strconv.ParseInt(val, defaultBase, defaultBitSize)
if err != nil {
return m, fmt.Errorf("redis store error: can't parse maxRetries field: %s", err)
}
m.maxRetries = int(parsedVal)
}

m.maxRetryBackoff = defaultMaxRetryBackoff
if val, ok := meta.Properties[maxRetryBackoff]; ok && val != "" {
parsedVal, err := strconv.ParseInt(val, defaultBase, defaultBitSize)
if err != nil {
return m, fmt.Errorf("redis store error: can't parse maxRetryBackoff field: %s", err)
}
m.maxRetryBackoff = time.Duration(parsedVal)
}

return m, nil
}

type metadata struct {
host string
password string
maxRetries int
maxRetryBackoff time.Duration
enableTLS bool
}
Loading

0 comments on commit dbc468f

Please sign in to comment.