Skip to content

Commit

Permalink
add delete broken copyset in chunkserver cli
Browse files Browse the repository at this point in the history
add disk full DiskState and retry readonly response

Signed-off-by: liuminjian <[email protected]>
  • Loading branch information
liuminjian committed Nov 15, 2023
1 parent 01e9dc4 commit d39e3fb
Show file tree
Hide file tree
Showing 11 changed files with 187 additions and 64 deletions.
2 changes: 0 additions & 2 deletions proto/heartbeat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@ message CopySetConf {
// 表示待删除节点。
// chunkserver收到CHANGE_PEER,根据peers,configchangeItem,oldPeer拼出新的conf
optional common.Peer oldPeer = 7;
// copyset availflag
optional bool availflag = 8;
};

enum HeartbeatStatusCode {
Expand Down
1 change: 1 addition & 0 deletions proto/topology.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ enum ChunkServerStatus {
enum DiskState {
DISKNORMAL = 0;
DISKERROR = 1;
DISKFULL = 2;
}

enum OnlineState {
Expand Down
11 changes: 8 additions & 3 deletions src/chunkserver/heartbeat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,14 @@ int Heartbeat::BuildRequest(HeartbeatRequest* req) {
int leaders = 0;

for (CopysetNodePtr copyset : copysets) {

// 如果磁盘空间不足设为readonly
if (diskState->errtype() == curve::mds::heartbeat::DISKFULL) {
copyset->SetReadOnly(true);
}else {
copyset->SetReadOnly(false);
}

curve::mds::heartbeat::CopySetInfo* info = req->add_copysetinfos();

ret = BuildCopysetInfo(info, copyset);
Expand Down Expand Up @@ -439,9 +447,6 @@ int Heartbeat::ExecTask(const HeartbeatResponse& response) {
continue;
}

// 判断copyset是否avail,否则设置readonly
copyset->SetReadOnly(!conf.availflag());

// 解析该chunkserver上的copyset是否需要删除
// 需要删除则清理copyset
if (HeartbeatHelper::NeedPurge(csEp_, conf, copyset)) {
Expand Down
1 change: 1 addition & 0 deletions src/client/chunk_closure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ void ClientClosure::Run() {
break;

case CHUNK_OP_STATUS::CHUNK_OP_STATUS_READONLY:
needRetry = true;
OnReadOnly();
break;

Expand Down
3 changes: 0 additions & 3 deletions src/mds/heartbeat/copyset_conf_generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ bool CopysetConfGenerator::GenCopysetConf(
return true;
}

// set copyset availflag
copysetConf->set_availflag(recordCopySetInfo.IsAvailable());

if (reportCopySetInfo.GetLeader() == reportId) {
ChunkServerIdType candidate =
LeaderGenCopysetConf(reportCopySetInfo, configChInfo, copysetConf);
Expand Down
25 changes: 8 additions & 17 deletions src/mds/heartbeat/heartbeat_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,40 +102,31 @@ void HeartbeatManager::UpdateChunkServerDiskStatus(
// update ChunkServerState status (disk status)
ChunkServerState state;

switch (request.diskstate().errtype())
{
case curve::mds::heartbeat::DISKFULL:
{
curve::mds::heartbeat::ErrorType errType = request.diskstate().errtype();

if (errType == curve::mds::heartbeat::DISKFULL) {
// 当chunkserver磁盘接近满,需要将copyset availflag设为false,避免新空间从这些copyset分配
CopySetFilter filter = [](const curve::mds::topology::CopySetInfo &cs) {
return cs.IsAvailable();
};
std::vector<CopySetKey> keys = topology_->GetCopySetsInChunkServer(request.chunkserverid(), filter);
for (auto key : keys) {
topology_->SetCopySetAvalFlag(key, false);
topology_->SetCopySetAvalFlag(key, false);
}
// 设置disk error,copyset就不会迁移到这个chunkserver
state.SetDiskState(curve::mds::topology::DISKERROR);
state.SetDiskState(curve::mds::topology::DISKFULL);
LOG(ERROR) << "heartbeat report disk full error, "
<< "diskused = " << request.diskused()
<< "capacity = " << request.diskcapacity()
<< "chunkserverid =" << request.chunkserverid();
break;
}

case curve::mds::heartbeat::NORMAL:
{

}else if (errType == curve::mds::heartbeat::NORMAL) {
state.SetDiskState(curve::mds::topology::DISKNORMAL);
break;
}
default:
{
}else {
state.SetDiskState(curve::mds::topology::DISKERROR);
LOG(ERROR) << "heartbeat report disk error, "
<< "errortype = " << request.diskstate().errtype()
<< "errmsg = " << request.diskstate().errmsg();
break;
}
}

state.SetDiskCapacity(request.diskcapacity());
Expand Down
76 changes: 38 additions & 38 deletions tools-v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,34 @@ module github.com/opencurve/curve/tools-v2

go 1.19

replace github.com/optiopay/kafka => github.com/cilium/kafka v0.0.0-20180809090225-01ce283b732b

require (
github.com/cilium/cilium v1.13.7
github.com/deckarep/golang-set/v2 v2.3.0
github.com/docker/cli v24.0.2+incompatible
github.com/cilium/cilium v1.14.3
github.com/deckarep/golang-set/v2 v2.3.1
github.com/docker/cli v24.0.7+incompatible
github.com/dustin/go-humanize v1.0.1
github.com/gookit/color v1.5.3
github.com/gookit/color v1.5.4
github.com/moby/term v0.5.0
github.com/olekukonko/tablewriter v0.0.5
github.com/pkg/xattr v0.4.9
github.com/schollz/progressbar/v3 v3.13.1
github.com/smartystreets/goconvey v1.8.0
github.com/spf13/cobra v1.7.0
github.com/schollz/progressbar/v3 v3.14.0
github.com/smartystreets/goconvey v1.8.1
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.16.0
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1
golang.org/x/sys v0.13.0
google.golang.org/grpc v1.55.0
google.golang.org/protobuf v1.30.0
github.com/spf13/viper v1.17.0
golang.org/x/exp v0.0.0-20231108232855-2478ac86f678
golang.org/x/sys v0.14.0
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0
)

require (
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/Shopify/logrus-bugsnag v0.0.0-20171204204709-577dee27f20d // indirect
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/Microsoft/go-winio v0.6.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/docker/distribution v2.8.2+incompatible // indirect
github.com/docker/docker v24.0.2+incompatible // indirect
github.com/docker/docker-credential-helpers v0.7.0 // indirect
github.com/docker/docker-credential-helpers v0.8.0 // indirect
github.com/docker/go v1.5.1-1.0.20160303222718-d30aec9fd63c // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-metrics v0.0.1 // indirect
Expand All @@ -47,39 +44,42 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jtolds/gls v4.20.0+incompatible // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/miekg/pkcs11 v1.1.1 // indirect
github.com/miekg/pkcs11 v1.0.2 // indirect
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/sys/sequential v0.5.0 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.15.1 // indirect
github.com/prometheus/client_golang v1.16.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.11.0 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/sagikazarmark/locafero v0.3.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/smartystreets/assertions v1.13.1 // indirect
github.com/spf13/afero v1.9.5 // indirect
github.com/smarty/assertions v1.15.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.10.0 // indirect
github.com/spf13/cast v1.5.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/theupdateframework/notary v0.7.0 // indirect
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/mod v0.10.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/tools v0.9.3 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect
github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.15.0 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/term v0.14.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.15.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230920204549-e6e6cdab5c13 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gotest.tools/v3 v3.0.3 // indirect
gotest.tools/v3 v3.5.1 // indirect
)
102 changes: 102 additions & 0 deletions tools-v2/pkg/cli/command/curvebs/delete/copyset/copyset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package copyset

import (
"context"
cmderror "github.com/opencurve/curve/tools-v2/internal/error"
basecmd "github.com/opencurve/curve/tools-v2/pkg/cli/command"
"github.com/opencurve/curve/tools-v2/pkg/config"
"github.com/opencurve/curve/tools-v2/pkg/output"
"github.com/opencurve/curve/tools-v2/proto/proto/topology"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"strconv"
)

const (
deleteBrokenCopySetExample = `$ curve bs delete broken-copyset --chunkserverid=1`
)

type DeleteBrokenCopySetInChunkServerRpc struct {
Info *basecmd.Rpc
Request *topology.DeleteBrokenCopysetInChunkServerRequest
topologyClient topology.TopologyServiceClient
}

var _ basecmd.RpcFunc = (*DeleteBrokenCopySetInChunkServerRpc)(nil) // check interface

type DeleteBrokenCopySetCommand struct {
basecmd.FinalCurveCmd
Rpc *DeleteBrokenCopySetInChunkServerRpc
Servers []*topology.ServerInfo
chunkserverid uint32
}

func (d *DeleteBrokenCopySetInChunkServerRpc) Stub_Func(ctx context.Context) (interface{}, error) {
return d.topologyClient.DeleteBrokenCopysetInChunkServer(ctx, d.Request)
}

func NewDeleteCommand() *cobra.Command {
return NewDeleteBrokenCopySetCommand().Cmd
}

func NewDeleteBrokenCopySetCommand() *DeleteBrokenCopySetCommand {
cmd := &DeleteBrokenCopySetCommand{
FinalCurveCmd: basecmd.FinalCurveCmd{
Use: "broken-copyset",
Short: "delete broken copyset in chunkserver",
Example: deleteBrokenCopySetExample,
},
}

basecmd.NewFinalCurveCli(&cmd.FinalCurveCmd, cmd)
return cmd
}

func (d *DeleteBrokenCopySetCommand) AddFlags() {
config.AddBsMdsFlagOption(d.Cmd)
config.AddRpcRetryTimesFlag(d.Cmd)
config.AddRpcTimeoutFlag(d.Cmd)
config.AddBsChunkServerIdFlag(d.Cmd)
}

func (d *DeleteBrokenCopySetInChunkServerRpc) NewRpcClient(cc grpc.ClientConnInterface) {
d.topologyClient = topology.NewTopologyServiceClient(cc)
}

func (d *DeleteBrokenCopySetCommand) Init(cmd *cobra.Command, args []string) error {
mdsAddrs, err := config.GetBsMdsAddrSlice(d.Cmd)
if err.TypeCode() != cmderror.CODE_SUCCESS {
return err.ToError()
}
timeout := config.GetFlagDuration(d.Cmd, config.RPCTIMEOUT)
retrytimes := config.GetFlagInt32(d.Cmd, config.RPCRETRYTIMES)
strid, e := strconv.Atoi(config.GetBsFlagString(d.Cmd, config.CURVEBS_CHUNKSERVER_ID))
if e != nil {
return e
}
d.chunkserverid = uint32(strid)
d.Rpc = &DeleteBrokenCopySetInChunkServerRpc{
Info: basecmd.NewRpc(mdsAddrs, timeout, retrytimes, "DeleteBrokenCopysetInChunkServer"),
Request: &topology.DeleteBrokenCopysetInChunkServerRequest{
ChunkServerID: &d.chunkserverid,
},
}
return nil
}

func (d *DeleteBrokenCopySetCommand) RunCommand(cmd *cobra.Command, args []string) error {
result, errCmd := basecmd.GetRpcResponse(d.Rpc.Info, d.Rpc)
if errCmd.TypeCode() != cmderror.CODE_SUCCESS {
return errCmd.ToError()
}
d.Result = result
return nil
}

func (d *DeleteBrokenCopySetCommand) Print(cmd *cobra.Command, args []string) error {
return output.FinalCmdOutput(&d.FinalCurveCmd, d)
}

func (d *DeleteBrokenCopySetCommand) ResultPlainOutput() error {
return output.FinalCmdOutputPlain(&d.FinalCurveCmd)
}
2 changes: 2 additions & 0 deletions tools-v2/pkg/cli/command/curvebs/delete/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package delete

import (
"github.com/opencurve/curve/tools-v2/pkg/cli/command/curvebs/delete/copyset"
"github.com/spf13/cobra"

basecmd "github.com/opencurve/curve/tools-v2/pkg/cli/command"
Expand All @@ -26,6 +27,7 @@ func (dCmd *DeleteCommand) AddSubCommands() {
file.NewFileCommand(),
peer.NewCommand(),
volume.NewVolumeCommand(),
copyset.NewDeleteCommand(),
)
}

Expand Down
2 changes: 2 additions & 0 deletions tools-v2/pkg/cli/command/curvebs/list/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/opencurve/curve/tools-v2/pkg/cli/command/curvebs/list/server"
"github.com/opencurve/curve/tools-v2/pkg/cli/command/curvebs/list/snapshot"
"github.com/opencurve/curve/tools-v2/pkg/cli/command/curvebs/list/space"
"github.com/opencurve/curve/tools-v2/pkg/cli/command/curvebs/list/unavailcopysets"
"github.com/spf13/cobra"
)

Expand All @@ -55,6 +56,7 @@ func (listCmd *ListCommand) AddSubCommands() {
may_broken_vol.NewMayBrokenVolCommand(),
formatstatus.NewFormatStatusCommand(),
snapshot.NewSnapShotCommand(),
unavailcopysets.NewUnAvailCopySetsCommand(),
)
}

Expand Down
Loading

0 comments on commit d39e3fb

Please sign in to comment.