Skip to content

Commit

Permalink
Feature: now we support the rqlite which is distributed database as c…
Browse files Browse the repository at this point in the history
…urveadm's database (opencurve#206).

Signed-off-by: Wine93 <[email protected]>
  • Loading branch information
Wine93 authored and caoxianfei1 committed Aug 7, 2023
1 parent c725b5d commit 5d4bb18
Show file tree
Hide file tree
Showing 14 changed files with 693 additions and 388 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: build debug test tar
.PHONY: build debug install test upload lint

# go env
GOPROXY := "https://goproxy.cn,direct"
Expand Down Expand Up @@ -60,6 +60,9 @@ build:
debug:
$(GOENV) $(GO) build -o $(OUTPUT) $(DEBUG_FLAGS) $(PACKAGES)

install:
cp bin/curveadm ~/.curveadm/bin

test:
$(GO_TEST) $(TEST_FLAGS) ./...

Expand Down
9 changes: 3 additions & 6 deletions cli/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ type CurveAdm struct {
pluginDir string
logDir string
tempDir string
dbpath string
logpath string
config *configure.CurveAdmConfig

Expand Down Expand Up @@ -142,9 +141,9 @@ func (curveadm *CurveAdm) init() error {
// (4) Init error code
errno.Init(logpath)

// (5) New storage: create table in sqlite
dbpath := fmt.Sprintf("%s/curveadm.db", curveadm.dataDir)
s, err := storage.NewStorage(dbpath)
// (5) New storage: create table in sqlite/rqlite
dbUrl := config.GetDBUrl()
s, err := storage.NewStorage(dbUrl)
if err != nil {
log.Error("Init SQLite database failed",
log.Field("Error", err))
Expand Down Expand Up @@ -174,7 +173,6 @@ func (curveadm *CurveAdm) init() error {
log.Field("ClusterName", cluster.Name))
}

curveadm.dbpath = dbpath
curveadm.logpath = logpath
curveadm.config = config
curveadm.in = os.Stdin
Expand Down Expand Up @@ -253,7 +251,6 @@ func (curveadm *CurveAdm) DataDir() string { return curveadm.d
func (curveadm *CurveAdm) PluginDir() string { return curveadm.pluginDir }
func (curveadm *CurveAdm) LogDir() string { return curveadm.logDir }
func (curveadm *CurveAdm) TempDir() string { return curveadm.tempDir }
func (curveadm *CurveAdm) DBPath() string { return curveadm.dbpath }
func (curveadm *CurveAdm) LogPath() string { return curveadm.logpath }
func (curveadm *CurveAdm) Config() *configure.CurveAdmConfig { return curveadm.config }
func (curveadm *CurveAdm) SudoAlias() string { return curveadm.config.GetSudoAlias() }
Expand Down
7 changes: 5 additions & 2 deletions cli/command/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
* Author: Jingli Chen (Wine93)
*/

// __SIGN_BY_WINE93__

package command

import (
Expand Down Expand Up @@ -60,6 +58,11 @@ func NewExecCommand(curveadm *cli.CurveAdm) *cobra.Command {
return cmd
}

// exec:
// 1. parse cluster topology
// 2. filter service
// 3. get container id
// 4. exec cmd in remote container
func runExec(curveadm *cli.CurveAdm, options execOptions) error {
// 1) parse cluster topology
dcs, err := curveadm.ParseTopology()
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ require (
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/rivo/uniseg v0.4.3 // indirect
github.com/rqlite/gorqlite v0.0.0-20230310040812-ec5e524a562e
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/spf13/afero v1.9.3 // indirect
github.com/spf13/cast v1.5.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,8 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/rqlite/gorqlite v0.0.0-20230310040812-ec5e524a562e h1:updBXFrJFAJO/3b/mctukZQEIVUq09iwV/wireIlZFA=
github.com/rqlite/gorqlite v0.0.0-20230310040812-ec5e524a562e/go.mod h1:xF/KoXmrRyahPfo5L7Szb5cAAUl53dMWBh9cMruGEZg=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/safchain/ethtool v0.0.0-20190326074333-42ed695e3de8/go.mod h1:Z0q5wiBQGYcxhMZ6gUqHn6pYNLypFAvaL3UvgZLR0U4=
Expand Down
101 changes: 85 additions & 16 deletions internal/configure/curveadm/curveadm.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
package curveadm

import (
"fmt"
"os"
"regexp"

"github.com/opencurve/curveadm/internal/build"
"github.com/opencurve/curveadm/internal/errno"
"github.com/opencurve/curveadm/internal/utils"
Expand All @@ -40,6 +44,9 @@ import (
* [ssh_connections]
* retries = 3
* timeout = 10
*
* [database]
* url = "sqlite:///home/curve/.curveadm/data/curveadm.db"
*/
const (
KEY_LOG_LEVEL = "log_level"
Expand All @@ -48,6 +55,13 @@ const (
KEY_AUTO_UPGRADE = "auto_upgrade"
KEY_SSH_RETRIES = "retries"
KEY_SSH_TIMEOUT = "timeout"
KEY_DB_URL = "url"

// rqlite://127.0.0.1:4000
// sqlite:///home/curve/.curveadm/data/curveadm.db
REGEX_DB_URL = "^(sqlite|rqlite)://(.+)$"
DB_SQLITE = "sqlite"
DB_RQLITE = "rqlite"

WITHOUT_SUDO = " "
)
Expand All @@ -60,26 +74,19 @@ type (
AutoUpgrade bool
SSHRetries int
SSHTimeout int
DBUrl string
}

CurveAdm struct {
Defaults map[string]interface{} `mapstructure:"defaults"`
SSHConnections map[string]interface{} `mapstructure:"ssh_connections"`
DataBase map[string]interface{} `mapstructure:"database"`
}
)

var (
GlobalCurveAdmConfig *CurveAdmConfig

defaultCurveAdmConfig = &CurveAdmConfig{
LogLevel: "error",
SudoAlias: "sudo",
Timeout: 180,
AutoUpgrade: true,
SSHRetries: 3,
SSHTimeout: 10,
}

SUPPORT_LOG_LEVEL = map[string]bool{
"debug": true,
"info": true,
Expand All @@ -92,6 +99,20 @@ func ReplaceGlobals(cfg *CurveAdmConfig) {
GlobalCurveAdmConfig = cfg
}

func newDefault() *CurveAdmConfig {
home, _ := os.UserHomeDir()
cfg := &CurveAdmConfig{
LogLevel: "error",
SudoAlias: "sudo",
Timeout: 180,
AutoUpgrade: true,
SSHRetries: 3,
SSHTimeout: 10,
DBUrl: fmt.Sprintf("sqlite://%s/.curveadm/data/curveadm.db", home),
}
return cfg
}

// TODO(P2): using ItemSet to check value type
func requirePositiveInt(k string, v interface{}) (int, error) {
num, ok := utils.Str2Int(v.(string))
Expand Down Expand Up @@ -141,6 +162,7 @@ func parseDefaultsSection(cfg *CurveAdmConfig, defaults map[string]interface{})
}
cfg.Timeout = num

// auto upgrade
case KEY_AUTO_UPGRADE:
yes, err := requirePositiveBool(KEY_AUTO_UPGRADE, v)
if err != nil {
Expand Down Expand Up @@ -189,8 +211,39 @@ func parseConnectionSection(cfg *CurveAdmConfig, connection map[string]interface
return nil
}

func parseDatabaseSection(cfg *CurveAdmConfig, database map[string]interface{}) error {
if database == nil {
return nil
}

for k, v := range database {
switch k {
// database url
case KEY_DB_URL:
dbUrl := v.(string)
pattern := regexp.MustCompile(REGEX_DB_URL)
mu := pattern.FindStringSubmatch(dbUrl)
if len(mu) == 0 {
return errno.ERR_UNSUPPORT_CURVEADM_DATABASE_URL.F("url: %s", dbUrl)
}
cfg.DBUrl = dbUrl

default:
return errno.ERR_UNSUPPORT_CURVEADM_CONFIGURE_ITEM.
F("%s: %s", k, v)
}
}

return nil
}

type sectionParser struct {
parser func(*CurveAdmConfig, map[string]interface{}) error
section map[string]interface{}
}

func ParseCurveAdmConfig(filename string) (*CurveAdmConfig, error) {
cfg := defaultCurveAdmConfig
cfg := newDefault()
if !utils.PathExist(filename) {
build.DEBUG(build.DEBUG_CURVEADM_CONFIGURE, cfg)
return cfg, nil
Expand All @@ -211,13 +264,16 @@ func ParseCurveAdmConfig(filename string) (*CurveAdmConfig, error) {
return nil, errno.ERR_PARSE_CURVRADM_CONFIGURE_FAILED.E(err)
}

err = parseDefaultsSection(cfg, global.Defaults)
if err != nil {
return nil, err
items := []sectionParser{
{parseDefaultsSection, global.Defaults},
{parseConnectionSection, global.SSHConnections},
{parseDatabaseSection, global.DataBase},
}
err = parseConnectionSection(cfg, global.SSHConnections)
if err != nil {
return nil, err
for _, item := range items {
err := item.parser(cfg, item.section)
if err != nil {
return nil, err
}
}

build.DEBUG(build.DEBUG_CURVEADM_CONFIGURE, cfg)
Expand All @@ -235,3 +291,16 @@ func (cfg *CurveAdmConfig) GetSudoAlias() string {
}
return cfg.SudoAlias
}

func (cfg *CurveAdmConfig) GetDBUrl() string {
return cfg.DBUrl
}

func (cfg *CurveAdmConfig) GetDBPath() string {
pattern := regexp.MustCompile(REGEX_DB_URL)
mu := pattern.FindStringSubmatch(cfg.DBUrl)
if len(mu) == 0 || mu[1] != DB_SQLITE {
return ""
}
return mu[2]
}
1 change: 1 addition & 0 deletions internal/errno/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ var (
// 311: configure (curveadm.cfg: invalid configure value)
ERR_UNSUPPORT_CURVEADM_LOG_LEVEL = EC(311000, "unsupport curveadm log level")
ERR_UNSUPPORT_CURVEADM_CONFIGURE_ITEM = EC(311001, "unsupport curveadm configure item")
ERR_UNSUPPORT_CURVEADM_DATABASE_URL = EC(311002, "unsupport curveadm database url")

// 320: configure (hosts.yaml: parse failed)
ERR_HOSTS_FILE_NOT_FOUND = EC(320000, "hosts file not found")
Expand Down
40 changes: 40 additions & 0 deletions internal/storage/driver/driver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2023 NetEase Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the Licensele().
*/

/*
* Project: CurveAdm
* Created Date: 2023-05-24
* Author: Jingli Chen (Wine93)
*/

package driver

type IQueryResult interface {
Next() bool
Scan(dest ...any) error
Close() error
}

type IWriteResult interface {
LastInsertId() (int64, error)
}

type IDataBaseDriver interface {
Open(dbUrl string) error
Close() error
Query(query string, args ...any) (IQueryResult, error)
Write(query string, args ...any) (IWriteResult, error)
}
Loading

0 comments on commit 5d4bb18

Please sign in to comment.