ETCD数据库源码分析——etcd gRPC 服务 API

编程入门 行业动态 更新时间:2024-10-25 22:37:28

ETCD数据库<a href=https://www.elefans.com/category/jswz/34/1770099.html style=源码分析——etcd gRPC 服务 API"/>

ETCD数据库源码分析——etcd gRPC 服务 API

本文将会开始介绍 etcd3 API 的核心设计,主要针对常见的 API 接口服务。对于理解 etcd 基本思想有很大的帮助。所有 etcd3 API 均在 gRPC 服务中定义,该服务对 etcd 服务器可以理解的远程过程调用(RPC)进行分类。发送到etcd服务器的每个API请求都是一个gRPC远程过程调用。etcd3 中的 RPC 接口定义根据功能分类到服务中。处理 etcd 键值的重要服务包括:

  • KV 服务,创建,更新,获取和删除键值对。
  • 监视,监视键的更改。
  • 租约,消耗客户端保持活动消息的基元。
  • 锁,etcd 提供分布式共享锁的支持。
  • 选举,暴露客户端选举机制。

集群提供服务

KV service

api/etcdserverpb/rpc.proto文件定义了KV服务,其中定义了五个客户端最基本、最常用的RPC方法,分别是Range、Put、Txn、DeleteRange和Compact方法,这些方法都是普通的RPC方法。

各个函数的介绍如下:Range,从键值存储中获取范围内的 key;Put,设置给定 key 到键值存储,put 请求增加键值存储的修订版本并在事件历史中生成一个事件;DeleteRange,从键值存储中删除给定范围,删除请求增加键值存储的修订版本并在事件历史中为每个被删除的key生成一个删除事件;Txn,在单个事务中处理多个请求,一个 txn 请求增加键值存储的修订版本并为每个完成的请求生成带有相同修订版本的事件。不容许在一个txn中多次修改同一个key;Compact,压缩在etcd键值存储中的事件历史。键值存储应该定期压缩,否则事件历史会无限制的持续增长。RangeRequest和RangeResponse消息类型定义在api/etcdserverpb/rpc.proto 404行和471行,同样其他rpc涉及的消息类型也是定义在api/etcdserverpb/rpc.proto中。

service KV {// Range gets the keys in the range from the key-value store.rpc Range(RangeRequest) returns (RangeResponse) { }// Put puts the given key into the key-value store. A put request increments the revision of the key-value store and generates one event in the event history.rpc Put(PutRequest) returns (PutResponse) { }// DeleteRange deletes the given range from the key-value store. A delete request increments the revision of the key-value store and generates a delete event in the event history for every deleted key.rpc DeleteRange(DeleteRangeRequest) returns (DeleteRangeResponse) { }// Txn processes multiple requests in a single transaction. A txn request increments the revision of the key-value store and generates events with the same revision for every completed request. It is not allowed to modify the same key several times within one txn.rpc Txn(TxnRequest) returns (TxnResponse) { }// Compact compacts the event history in the etcd key-value store. The key-value store should be periodically compacted or the event history will continue to grow indefinitely.rpc Compact(CompactionRequest) returns (CompactionResponse) { }
}

api/etcdserverpb/rpc.pb.go文件中包含了通过protoc命令生成api/etcdserverpb/rpc.proto的服务端代码,其中定义了KVServer接口及该接口使用到的消息等内容。

Watch service

api/etcdserverpb/rpc.proto文件定义了Watch服务,其中定义了Watch RPC方法。Watch观察正在发生或已经发生的事件。 输入和输出都是流; 输入流用于创建和取消观察者,输出流用于发送事件。 One watch RPC 可以在多个键范围上同时观看多个watches的流式事件。 可以从最后一次压缩修订开始查看整个事件历史记录。

service Watch {// Watch watches for events happening or that have happened. Both input and output// are streams; the input stream is for creating and canceling watchers and the output// stream sends events. One watch RPC can watch on multiple key ranges, streaming events// for several watches at once. The entire event history can be watched starting from the// last compaction revision.rpc Watch(stream WatchRequest) returns (stream WatchResponse) {option (google.api.http) = {post: "/v3/watch"body: "*"};}
}

api/etcdserverpb/rpc.pb.go文件中包含了通过protoc命令生成api/etcdserverpb/rpc.proto的服务端代码,其中定义了WatchServer接口及该接口使用到的消息等内容。

Lease service

api/etcdserverpb/rpc.proto文件定义了Lease服务,其中定义了5个 RPC方法。LeaseGrant 创建一个租约,如果服务器在给定的生存期内没有收到 keepAlive,则该租约到期。 如果租约到期,则附加到租约的所有密钥都将到期并删除。 每个过期的密钥都会在事件历史记录中生成一个删除事件。LeaseRevoke 撤销租约。 附加到租约的所有密钥都将过期并被删除。LeaseKeepAlive 通过将保持活动请求从客户端流式传输到服务器并将保持活动响应从服务器流式传输到客户端来保持租约活动。LeaseTimeToLive 检索租约信息。LeaseLeases 列出所有现有的租约。

service Lease {// LeaseGrant creates a lease which expires if the server does not receive a keepAlive// within a given time to live period. All keys attached to the lease will be expired and// deleted if the lease expires. Each expired key generates a delete event in the event history.rpc LeaseGrant(LeaseGrantRequest) returns (LeaseGrantResponse) { };}// LeaseRevoke revokes a lease. All keys attached to the lease will expire and be deleted.rpc LeaseRevoke(LeaseRevokeRequest) returns (LeaseRevokeResponse) { };}// LeaseKeepAlive keeps the lease alive by streaming keep alive requests from the client// to the server and streaming keep alive responses from the server to the client.rpc LeaseKeepAlive(stream LeaseKeepAliveRequest) returns (stream LeaseKeepAliveResponse) { };}// LeaseTimeToLive retrieves lease information.rpc LeaseTimeToLive(LeaseTimeToLiveRequest) returns (LeaseTimeToLiveResponse) { };}// LeaseLeases lists all existing leases.rpc LeaseLeases(LeaseLeasesRequest) returns (LeaseLeasesResponse) { };}
}

api/etcdserverpb/rpc.pb.go文件中包含了通过protoc命令生成api/etcdserverpb/rpc.proto的服务端代码,其中定义了LeaseServer接口及该接口使用到的消息等内容。

Cluster service

api/etcdserverpb/rpc.proto文件定义了Cluster服务,其中定义了5个 RPC方法。MemberAdd 将成员添加到集群中。MemberRemove 从集群中删除现有成员。MemberUpdate 更新成员配置。MemberList 列出集群中的所有成员。MemberPromote 将成员从 raft 学习者(无投票权)提升为 raft 投票成员。

service Cluster {// MemberAdd adds a member into the cluster.rpc MemberAdd(MemberAddRequest) returns (MemberAddResponse) { };}// MemberRemove removes an existing member from the cluster.rpc MemberRemove(MemberRemoveRequest) returns (MemberRemoveResponse) { };}// MemberUpdate updates the member configuration.rpc MemberUpdate(MemberUpdateRequest) returns (MemberUpdateResponse) { };}// MemberList lists all the members in the cluster.rpc MemberList(MemberListRequest) returns (MemberListResponse) { };}// MemberPromote promotes a member from raft learner (non-voting) to raft voting member.rpc MemberPromote(MemberPromoteRequest) returns (MemberPromoteResponse) { };}
}

api/etcdserverpb/rpc.pb.go文件中包含了通过protoc命令生成api/etcdserverpb/rpc.proto的服务端代码,其中定义了ClusterServer接口及该接口使用到的消息等内容。

Maintenance service

api/etcdserverpb/rpc.proto文件定义了Maintenance服务,其中定义了8个 RPC方法。Alarm激活、停用和查询有关集群运行状况的警报。Status获取成员的状态。Defragment 对成员的后端数据库进行碎片整理以恢复存储空间。Hash哈希计算整个后端密钥空间的哈希,包括密钥、租约和存储中的其他存储桶。 这仅用于测试! 不要在生产中依赖于正在进行的事务,因为哈希操作不持有 MVCC 锁。 使用“HashKV”API 代替“key”存储桶一致性检查。HashKV 计算所有 MVCC 键的哈希值,直到给定修订版。 它仅在后端存储中迭代“密钥”存储桶。Snapshot快照通过流将整个后端的快照从成员发送到客户端。MoveLeader 请求当前领导节点将其领导权转移给受让人。降级请求对集群版本进行降级、验证可行性或取消降级, 自 etcd 3.5 起支持。

service Maintenance {// Alarm activates, deactivates, and queries alarms regarding cluster health.rpc Alarm(AlarmRequest) returns (AlarmResponse) { };}// Status gets the status of the member.rpc Status(StatusRequest) returns (StatusResponse) { };}// Defragment defragments a member's backend database to recover storage space.rpc Defragment(DefragmentRequest) returns (DefragmentResponse) { };}// Hash computes the hash of whole backend keyspace,// including key, lease, and other buckets in storage.// This is designed for testing ONLY!// Do not rely on this in production with ongoing transactions,// since Hash operation does not hold MVCC locks.// Use "HashKV" API instead for "key" bucket consistency checks.rpc Hash(HashRequest) returns (HashResponse) { };}// HashKV computes the hash of all MVCC keys up to a given revision.// It only iterates "key" bucket in backend storage.rpc HashKV(HashKVRequest) returns (HashKVResponse) { };}// Snapshot sends a snapshot of the entire backend from a member over a stream to a client.rpc Snapshot(SnapshotRequest) returns (stream SnapshotResponse) { };}// MoveLeader requests current leader node to transfer its leadership to transferee.rpc MoveLeader(MoveLeaderRequest) returns (MoveLeaderResponse) { };}// Downgrade requests downgrades, verifies feasibility or cancels downgrade// on the cluster version.// Supported since etcd 3.5.rpc Downgrade(DowngradeRequest) returns (DowngradeResponse) { };}
}

api/etcdserverpb/rpc.pb.go文件中包含了通过protoc命令生成api/etcdserverpb/rpc.proto的服务端代码,其中定义了MaintenanceServer接口及该接口使用到的消息等内容。

Auth service

api/etcdserverpb/rpc.proto文件定义了Auth服务,其中定义了17个 RPC方法。AuthEnable 启用身份验证。Auth Disable 禁用身份验证。AuthStatus 显示身份验证状态。AuthStatus 显示身份验证状态。Authenticate 处理一个认证请求。UserAdd 添加一个新用户。 用户名不能为空。UserGet 获取详细的用户信息。UserList 获取所有用户的列表。UserDelete 删除指定的用户。UserChangePassword 更改指定用户的密码。UserGrant 将角色授予指定用户。UserRevokeRole 撤销指定用户的角色。RoleAdd 添加一个新角色。 角色名称不能为空。RoleGet 获取详细的角色信息。RoleList 获取所有角色的列表。RoleGrantPermission 将指定键或范围的权限授予指定角色。RoleRevokePermission 撤销指定角色的键或范围权限。

service Auth {// AuthEnable enables authentication.rpc AuthEnable(AuthEnableRequest) returns (AuthEnableResponse) { };}// AuthDisable disables authentication.rpc AuthDisable(AuthDisableRequest) returns (AuthDisableResponse) { };}// AuthStatus displays authentication status.rpc AuthStatus(AuthStatusRequest) returns (AuthStatusResponse) { };}// Authenticate processes an authenticate request.rpc Authenticate(AuthenticateRequest) returns (AuthenticateResponse) { };}// UserAdd adds a new user. User name cannot be empty.rpc UserAdd(AuthUserAddRequest) returns (AuthUserAddResponse) { };}// UserGet gets detailed user information.rpc UserGet(AuthUserGetRequest) returns (AuthUserGetResponse) { };}// UserList gets a list of all users.rpc UserList(AuthUserListRequest) returns (AuthUserListResponse) { };}// UserDelete deletes a specified user.rpc UserDelete(AuthUserDeleteRequest) returns (AuthUserDeleteResponse) { };}// UserChangePassword changes the password of a specified user.rpc UserChangePassword(AuthUserChangePasswordRequest) returns (AuthUserChangePasswordResponse) { };}// UserGrant grants a role to a specified user.rpc UserGrantRole(AuthUserGrantRoleRequest) returns (AuthUserGrantRoleResponse) { };}// UserRevokeRole revokes a role of specified user.rpc UserRevokeRole(AuthUserRevokeRoleRequest) returns (AuthUserRevokeRoleResponse) { };}// RoleAdd adds a new role. Role name cannot be empty.rpc RoleAdd(AuthRoleAddRequest) returns (AuthRoleAddResponse) { };}// RoleGet gets detailed role information.rpc RoleGet(AuthRoleGetRequest) returns (AuthRoleGetResponse) { };}// RoleList gets lists of all roles.rpc RoleList(AuthRoleListRequest) returns (AuthRoleListResponse) { };}// RoleDelete deletes a specified role.rpc RoleDelete(AuthRoleDeleteRequest) returns (AuthRoleDeleteResponse) { };}// RoleGrantPermission grants a permission of a specified key or range to a specified role.rpc RoleGrantPermission(AuthRoleGrantPermissionRequest) returns (AuthRoleGrantPermissionResponse) { };}// RoleRevokePermission revokes a key or range permission of a specified role.rpc RoleRevokePermission(AuthRoleRevokePermissionRequest) returns (AuthRoleRevokePermissionResponse) { };}
}

api/etcdserverpb/rpc.pb.go文件中包含了通过protoc命令生成api/etcdserverpb/rpc.proto的服务端代码,其中定义了AuthServer接口及该接口使用到的消息等内容。

Election service

server/etcdserver/api/v3election/v3electionpb/v3eletion.proto文件定义了Election服务,其中定义了五个客户端最基本、最常用的RPC方法。Campaign 等待在选举中获得领导权,如果成功则返回代表领导权的 LeaderKey。 然后,LeaderKey 可用于在选举中发布新值,以事务方式保护 API 请求仍处于领导地位,并退出选举。Proclaim 使用新值更新领导者的发布值。Leader返回当前的选举公告,如果有的话。Observe按选举人的顺序观察流选举公告领导。Resign辞职会释放选举领导权,以便其他活动家可以在选举中获得领导权。

service Election {// Campaign waits to acquire leadership in an election, returning a LeaderKey// representing the leadership if successful. The LeaderKey can then be used// to issue new values on the election, transactionally guard API requests on// leadership still being held, and resign from the election.rpc Campaign(CampaignRequest) returns (CampaignResponse) {option (google.api.http) = {post: "/v3/election/campaign"body: "*"};}// Proclaim updates the leader's posted value with a new value.rpc Proclaim(ProclaimRequest) returns (ProclaimResponse) {option (google.api.http) = {post: "/v3/election/proclaim"body: "*"};}// Leader returns the current election proclamation, if any.rpc Leader(LeaderRequest) returns (LeaderResponse) {option (google.api.http) = {post: "/v3/election/leader"body: "*"};}// Observe streams election proclamations in-order as made by the election's// elected leaders.rpc Observe(LeaderRequest) returns (stream LeaderResponse) {option (google.api.http) = {post: "/v3/election/observe"body: "*"};}// Resign releases election leadership so other campaigners may acquire// leadership on the election.rpc Resign(ResignRequest) returns (ResignResponse) {option (google.api.http) = {post: "/v3/election/resign"body: "*"};}
}

server/etcdserver/api/v3election/v3electionpb/v3election.pb.go文件中包含了通过protoc命令生成server/etcdserver/api/v3election/v3electionpb/v3eletion.proto的服务端代码,其中定义了ElectionServer接口及该接口使用到的消息等内容。

Lock service

server/etcdserver/api/v3lock/v3lockpb/v3lock.proto文件定义了Lcok服务,其中定义了两个客户端最基本、最常用的RPC方法。Lock 在给定的命名锁上获取分布式共享锁。 成功时,只要调用者持有锁,它就会返回一个唯一的密钥。 此密钥可以与事务结合使用,以安全地确保仅在持有锁所有权时才对 etcd 进行更新。 锁会一直保持,直到在密钥上调用 Unlock 或与所有者关联的租约到期。Unlock 获取 Lock 返回的密钥并释放对锁定的保留。 下一个等待锁的锁调用者将被唤醒并获得锁的所有权。

service Lock {// Lock acquires a distributed shared lock on a given named lock.// On success, it will return a unique key that exists so long as the// lock is held by the caller. This key can be used in conjunction with// transactions to safely ensure updates to etcd only occur while holding// lock ownership. The lock is held until Unlock is called on the key or the// lease associate with the owner expires.rpc Lock(LockRequest) returns (LockResponse) {option (google.api.http) = {post: "/v3/lock/lock"body: "*"};}// Unlock takes a key returned by Lock and releases the hold on lock. The// next Lock caller waiting for the lock will then be woken up and given// ownership of the lock.rpc Unlock(UnlockRequest) returns (UnlockResponse) {option (google.api.http) = {post: "/v3/lock/unlock"body: "*"};}
}

server/etcdserver/api/v3lock/v3lockpb/v3lock.pb.go文件中包含了通过protoc命令生成server/etcdserver/api/v3lock/v3lockpb/v3lock.proto的服务端代码,其中定义了LcokServer接口及该接口使用到的消息等内容。

服务注册

server/embed/etcd.go中的serveClients函数为每个监听地址启动一个客户服务端协程,首先需要初始化Handler实例、根据配置为grpc服务端生成不同的选项,最后为每个serveCtx启动一个客户服务端协程go func(s *serveCtx) { e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, gopts...)) }(sctx)。server/embed/server.go中的serve函数主要的功能就是启动客户服务端v3rpc.Server(s, nil, nil, gopts...) // 在其中会完成GRPC服务的注册

// server/etcdserver/api/v3rpc/grpc.go
func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnaryServerInterceptor, gopts ...grpc.ServerOption) *grpc.Server {var opts []grpc.ServerOptionopts = append(opts, grpc.CustomCodec(&codec{}))if tls != nil {bundle := credentials.NewBundle(credentials.Config{TLSConfig: tls})opts = append(opts, grpc.Creds(bundle.TransportCredentials()))}chainUnaryInterceptors := []grpc.UnaryServerInterceptor{ newLogUnaryInterceptor(s), newUnaryInterceptor(s), grpc_prometheus.UnaryServerInterceptor, }if interceptor != nil { chainUnaryInterceptors = append(chainUnaryInterceptors, interceptor) }chainStreamInterceptors := []grpc.StreamServerInterceptor{ newStreamInterceptor(s), grpc_prometheus.StreamServerInterceptor, }if s.Cfg.ExperimentalEnableDistributedTracing {chainUnaryInterceptors = append(chainUnaryInterceptors, otelgrpc.UnaryServerInterceptor(s.Cfg.ExperimentalTracerOptions...))chainStreamInterceptors = append(chainStreamInterceptors, otelgrpc.StreamServerInterceptor(s.Cfg.ExperimentalTracerOptions...))}opts = append(opts, grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(chainUnaryInterceptors...)))opts = append(opts, grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(chainStreamInterceptors...)))opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))opts = append(opts, grpc.MaxConcurrentStreams(maxStreams))grpcServer := grpc.NewServer(append(opts, gopts...)...)pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))       // 注册 KV servicepb.RegisterWatchServer(grpcServer, NewWatchServer(s))      // 注册 Watch servicepb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s)) // 注册 Lease servicepb.RegisterClusterServer(grpcServer, NewClusterServer(s))  // 注册 Cluster servicepb.RegisterAuthServer(grpcServer, NewAuthServer(s))        // 注册 Auth servicepb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s)) // 注册 Maintenance service// server should register all the services manually, use empty service name for all etcd services' health status,// see .md for morehsrv := health.NewServer()hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)healthpb.RegisterHealthServer(grpcServer, hsrv)grpc_prometheus.Register(grpcServer) // set zero values for metrics registered for this grpc serverreturn grpcServer
}

而Lock和Election服务是在上层函数serve中进行注册的,详细代码如下图所示:

/

Raft协议采用分治的思想,把分布式协同的问题分为3个问题:
选举: 一个新的集群启动时,或者老的leader故障时,会选举出一个新的leader;
日志同步: leader必须接受客户端的日志条目并且将他们同步到集群的所有机器;
安全: 保证任何节点只要在它的状态机中生效了一条日志条目,就不会在相同的key上生效另一条日志条目。

更多推荐

ETCD数据库源码分析——etcd gRPC 服务 API

本文发布于:2024-02-14 04:01:23,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1761953.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:源码   数据库   ETCD   API   gRPC

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!