博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
区块链教程Fabric1.0源代码分析Peer peer node start命令实现
阅读量:5796 次
发布时间:2019-06-18

本文共 15985 字,大约阅读时间需要 53 分钟。

  区块链教程Fabric1.0源代码分析Peer peer node start命令实现,2018年下半年,区块链行业正逐渐褪去发展之初的浮躁、回归理性,表面上看相关人才需求与身价似乎正在回落。但事实上,正是初期泡沫的渐退,让人们更多的关注点放在了区块链真正的技术之上。

Fabric 1.0源代码笔记 之 Peer #peer node start命令实现

区块链教程Fabric1.0源代码分析Peer peer node start命令实现

1、peer node加载子命令start和status

peer node加载子命令start和status,代码如下:

func Cmd() *cobra.Command {    nodeCmd.AddCommand(startCmd()) //加载子命令start    nodeCmd.AddCommand(statusCmd()) //加载子命令status    return nodeCmd}var nodeCmd = &cobra.Command{    Use:   nodeFuncName,    Short: fmt.Sprint(shortDes),    Long:  fmt.Sprint(longDes),}//代码在peer/node/node.go

startCmd()代码如下:

其中serve(args)为peer node start的实现代码,比较复杂,本文将重点讲解。
另statusCmd()代码与startCmd()相近,暂略。

func startCmd() *cobra.Command {    flags := nodeStartCmd.Flags()    flags.BoolVarP(&chaincodeDevMode, "peer-chaincodedev", "", false, "Whether peer in chaincode development mode")    flags.BoolVarP(&peerDefaultChain, "peer-defaultchain", "", false, "Whether to start peer with chain testchainid")    flags.StringVarP(&orderingEndpoint, "orderer", "o", "orderer:7050", "Ordering service endpoint") //orderer    return nodeStartCmd}var nodeStartCmd = &cobra.Command{    Use:   "start",    Short: "Starts the node.",    Long:  `Starts a node that interacts with the network.`,    RunE: func(cmd *cobra.Command, args []string) error {        return serve(args) //serve(args)为peer node start的实现代码    },}//代码在peer/node/start.go

注:如下内容均为serve(args)的代码,即peer node start命令执行流程。

2、初始化Ledger(账本)

初始化账本,即如下一条代码:

ledgermgmt.Initialize()//代码在peer/node/start.go

ledgermgmt.Initialize()代码展开如下:

func initialize() {    openedLedgers = make(map[string]ledger.PeerLedger) //openedLedgers为全局变量,存储目前使用的账本列表    provider, err := kvledger.NewProvider() //创建账本Provider实例    ledgerProvider = provider}//代码在core/ledger/ledgermgmt/ledger_mgmt.go

kvledger.NewProvider()代码如下:

func NewProvider() (ledger.PeerLedgerProvider, error) {    idStore := openIDStore(ledgerconfig.GetLedgerProviderPath()) //打开idStore    //创建并初始化blkstorage    attrsToIndex := []blkstorage.IndexableAttr{        blkstorage.IndexableAttrBlockHash,        blkstorage.IndexableAttrBlockNum,        blkstorage.IndexableAttrTxID,        blkstorage.IndexableAttrBlockNumTranNum,        blkstorage.IndexableAttrBlockTxID,        blkstorage.IndexableAttrTxValidationCode,    }    indexConfig := &blkstorage.IndexConfig{AttrsToIndex: attrsToIndex}    blockStoreProvider := fsblkstorage.NewProvider(        fsblkstorage.NewConf(ledgerconfig.GetBlockStorePath(), ledgerconfig.GetMaxBlockfileSize()),        indexConfig)    //创建并初始化statedb    var vdbProvider statedb.VersionedDBProvider    if !ledgerconfig.IsCouchDBEnabled() {        vdbProvider = stateleveldb.NewVersionedDBProvider()    } else {        vdbProvider, err = statecouchdb.NewVersionedDBProvider()    }    //创建并初始化historydb    var historydbProvider historydb.HistoryDBProvider    historydbProvider = historyleveldb.NewHistoryDBProvider()    //构造Provider    provider := &Provider{idStore, blockStoreProvider, vdbProvider, historydbProvider}    provider.recoverUnderConstructionLedger()    return provider, nil}//代码在core/ledger/kvledger/kv_ledger_provider.go

Ledger更详细内容,参考:Fabric 1.0源代码笔记 之 Ledger(账本)

3、配置及创建PeerServer、创建EventHubServer(事件中心服务器)

//初始化全局变量localAddress和peerEndpointerr := peer.CacheConfiguration() peerEndpoint, err := peer.GetPeerEndpoint() //获取peerEndpointlistenAddr := viper.GetString("peer.listenAddress") //PeerServer监听地址secureConfig, err := peer.GetSecureConfig() //获取PeerServer安全配置,是否启用TLS、公钥、私钥、根证书//以监听地址和安全配置,创建Peer GRPC ServerpeerServer, err := peer.CreatePeerServer(listenAddr, secureConfig)//创建EventHubServer(事件中心服务器)ehubGrpcServer, err := createEventHubServer(secureConfig)//代码在peer/node/start.go

func createEventHubServer(secureConfig comm.SecureServerConfig) (comm.GRPCServer, error)代码如下:

var lis net.Listenervar err errorlis, err = net.Listen("tcp", viper.GetString("peer.events.address")) //创建ListengrpcServer, err := comm.NewGRPCServerFromListener(lis, secureConfig) //从Listen创建GRPCServerehServer := producer.NewEventsServer(    uint(viper.GetInt("peer.events.buffersize")), //最大进行缓冲的消息数    viper.GetDuration("peer.events.timeout")) //缓冲已满的情况下,往缓冲中发送消息的超时pb.RegisterEventsServer(grpcServer.Server(), ehServer) //EventHubServer注册至grpcServerreturn grpcServer, nil//代码在peer/node/start.go

pb.RegisterEventsServer(grpcServer.Server(), ehServer)代码如下:

func RegisterEventsServer(s *grpc.Server, srv EventsServer) {    s.RegisterService(&_Events_serviceDesc, srv) }//代码在protos/peer/events.pb.go

events(事件服务)更详细内容,参考:Fabric 1.0源代码笔记 之 events(事件服务)

4、创建并启动Chaincode Server,并注册系统链码

代码如下:

ccprovider.EnableCCInfoCache() //ccInfoCacheEnabled = true//创建Chaincode Server//如果peer.chaincodeListenAddress,没有定义或者定义和peerListenAddress相同,均直接使用peerServer//否则另行创建NewGRPCServer用于Chaincode ServerccSrv, ccEpFunc := createChaincodeServer(peerServer, listenAddr)//Chaincode service注册到grpcServer,并注册系统链码registerChaincodeSupport(ccSrv.Server(), ccEpFunc)go ccSrv.Start() //启动grpcServer//代码在peer/node/start.go

ccSrv, ccEpFunc := createChaincodeServer(peerServer, listenAddr)代码如下:创建Chaincode Server。

func createChaincodeServer(peerServer comm.GRPCServer, peerListenAddress string) (comm.GRPCServer, ccEndpointFunc) {    //peer.chaincodeListenAddress,链码容器连接时的监听地址    cclistenAddress := viper.GetString("peer.chaincodeListenAddress")    var srv comm.GRPCServer    var ccEpFunc ccEndpointFunc        //如果peer.chaincodeListenAddress,没有定义或者定义和peerListenAddress相同,均直接使用peerServer    //否则另行创建NewGRPCServer用于Chaincode Server    if cclistenAddress == "" {        ccEpFunc = peer.GetPeerEndpoint        srv = peerServer    } else if cclistenAddress == peerListenAddress {        ccEpFunc = peer.GetPeerEndpoint        srv = peerServer    } else {        config, err := peer.GetSecureConfig()        srv, err = comm.NewGRPCServer(cclistenAddress, config)        ccEpFunc = getChaincodeAddressEndpoint    }    return srv, ccEpFunc}//代码在peer/node/start.go

registerChaincodeSupport(ccSrv.Server(), ccEpFunc)代码如下:Chaincode service注册到grpcServer。

func registerChaincodeSupport(grpcServer *grpc.Server, ccEpFunc ccEndpointFunc) {    userRunsCC := chaincode.IsDevMode() //是否开发模式    ccStartupTimeout := viper.GetDuration("chaincode.startuptimeout") //启动链码容器的超时    if ccStartupTimeout < time.Duration(5)*time.Second { //至少5秒        ccStartupTimeout = time.Duration(5) * time.Second    } else {    }    //构造ChaincodeSupport    ccSrv := chaincode.NewChaincodeSupport(ccEpFunc, userRunsCC, ccStartupTimeout)    scc.RegisterSysCCs() //注册系统链码    pb.RegisterChaincodeSupportServer(grpcServer, ccSrv) //service注册到grpcServer}//代码在peer/node/start.go

ccSrv := chaincode.NewChaincodeSupport(ccEpFunc, userRunsCC, ccStartupTimeout)代码如下:构造ChaincodeSupport。

var theChaincodeSupport *ChaincodeSupportfunc NewChaincodeSupport(getCCEndpoint func() (*pb.PeerEndpoint, error), userrunsCC bool, ccstartuptimeout time.Duration) *ChaincodeSupport {    //即/var/hyperledger/production/chaincodes    ccprovider.SetChaincodesPath(config.GetPath("peer.fileSystemPath") + string(filepath.Separator) + "chaincodes")    pnid := viper.GetString("peer.networkId") //网络ID    pid := viper.GetString("peer.id") //节点ID    //构造ChaincodeSupport    theChaincodeSupport = &ChaincodeSupport{runningChaincodes: &runningChaincodes{chaincodeMap: make(map[string]*chaincodeRTEnv), launchStarted: make(map[string]bool)}, peerNetworkID: pnid, peerID: pid}    ccEndpoint, err := getCCEndpoint() //此处传入ccEpFunc    if err != nil {        theChaincodeSupport.peerAddress = viper.GetString("chaincode.peerAddress")    } else {        theChaincodeSupport.peerAddress = ccEndpoint.Address    }    if theChaincodeSupport.peerAddress == "" {        theChaincodeSupport.peerAddress = peerAddressDefault    }    theChaincodeSupport.userRunsCC = userrunsCC //是否开发模式    theChaincodeSupport.ccStartupTimeout = ccstartuptimeout //启动链码容器的超时    theChaincodeSupport.peerTLS = viper.GetBool("peer.tls.enabled") //是否启用TLS    if theChaincodeSupport.peerTLS {        theChaincodeSupport.peerTLSCertFile = config.GetPath("peer.tls.cert.file")        theChaincodeSupport.peerTLSKeyFile = config.GetPath("peer.tls.key.file")        theChaincodeSupport.peerTLSSvrHostOrd = viper.GetString("peer.tls.serverhostoverride")    }    kadef := 0    // Peer和链码之间的心跳超时,小于或等于0意味着关闭    if ka := viper.GetString("chaincode.keepalive"); ka == "" {        theChaincodeSupport.keepalive = time.Duration(kadef) * time.Second //0    } else {        t, terr := strconv.Atoi(ka)        if terr != nil {            t = kadef //0        } else if t <= 0 {            t = kadef //0        }        theChaincodeSupport.keepalive = time.Duration(t) * time.Second //非0    }    execto := time.Duration(30) * time.Second    //invoke和initialize命令执行超时    if eto := viper.GetDuration("chaincode.executetimeout"); eto <= time.Duration(1)*time.Second {        //小于1秒时,默认30秒    } else {        execto = eto    }    theChaincodeSupport.executetimeout = execto    viper.SetEnvPrefix("CORE")    viper.AutomaticEnv()    replacer := strings.NewReplacer(".", "_")    viper.SetEnvKeyReplacer(replacer)    theChaincodeSupport.chaincodeLogLevel = getLogLevelFromViper("level")    theChaincodeSupport.shimLogLevel = getLogLevelFromViper("shim")    theChaincodeSupport.logFormat = viper.GetString("chaincode.logging.format")    return theChaincodeSupport}//代码在core/chaincode/chaincode_support.go

scc.RegisterSysCCs()代码如下:注册系统链码。

func RegisterSysCCs() {    //cscc、lscc、escc、vscc、qscc    for _, sysCC := range systemChaincodes {        RegisterSysCC(sysCC)    }}代码在core/scc/importsysccs.go

5、注册Admin server和Endorser server

代码如下:

//s.RegisterService(&_Admin_serviceDesc, srv)//var _Admin_serviceDesc = grpc.ServiceDesc{...}//core.NewAdminServer()构造ServerAdmin结构体,ServerAdmin结构体实现type AdminServer interface接口pb.RegisterAdminServer(peerServer.Server(), core.NewAdminServer())//构造结构体Endorser,Endorser结构体实现type EndorserServer interface接口serverEndorser := endorser.NewEndorserServer()//s.RegisterService(&_Endorser_serviceDesc, srv)//var _Endorser_serviceDesc = grpc.ServiceDesc{...}pb.RegisterEndorserServer(peerServer.Server(), serverEndorser)//代码在peer/node/start.go

附type AdminServer interface接口定义:

type AdminServer interface {    GetStatus(context.Context, *google_protobuf.Empty) (*ServerStatus, error)    StartServer(context.Context, *google_protobuf.Empty) (*ServerStatus, error)    GetModuleLogLevel(context.Context, *LogLevelRequest) (*LogLevelResponse, error)    SetModuleLogLevel(context.Context, *LogLevelRequest) (*LogLevelResponse, error)    RevertLogLevels(context.Context, *google_protobuf.Empty) (*google_protobuf.Empty, error)}//代码在protos/peer/admin.pb.go

附type EndorserServer interface接口定义:

type EndorserServer interface {    ProcessProposal(context.Context, *SignedProposal) (*ProposalResponse, error)}//代码在protos/peer/peer.pb.go

6、初始化Gossip服务

bootstrap := viper.GetStringSlice("peer.gossip.bootstrap") //启动节点后gossip连接的初始节点serializedIdentity, err := mgmt.GetLocalSigningIdentityOrPanic().Serialize() //获取签名身份messageCryptoService := peergossip.NewMCS( //构造mspMessageCryptoService(消息加密服务)    peer.NewChannelPolicyManagerGetter(), //构造type channelPolicyManagerGetter struct{}    localmsp.NewSigner(), //构造type mspSigner struct {}    mgmt.NewDeserializersManager()) //构造type mspDeserializersManager struct{}secAdv := peergossip.NewSecurityAdvisor(mgmt.NewDeserializersManager()) //构造mspSecurityAdvisor(安全顾问)secureDialOpts := func() []grpc.DialOption {    var dialOpts []grpc.DialOption    dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(comm.MaxRecvMsgSize()), //MaxRecvMsgSize        grpc.MaxCallSendMsgSize(comm.MaxSendMsgSize()))) //MaxSendMsgSize    dialOpts = append(dialOpts, comm.ClientKeepaliveOptions()...) //ClientKeepaliveOptions            if comm.TLSEnabled() {        tlsCert := peerServer.ServerCertificate()        dialOpts = append(dialOpts, grpc.WithTransportCredentials(comm.GetCASupport().GetPeerCredentials(tlsCert)))    } else {        dialOpts = append(dialOpts, grpc.WithInsecure())    }    return dialOpts}err = service.InitGossipService(serializedIdentity, peerEndpoint.Address, peerServer.Server(),    messageCryptoService, secAdv, secureDialOpts, bootstrap...) //构造gossipServiceImpldefer service.GetGossipService().Stop()//代码在peer/node/start.go

Gossip更详细内容参考:Fabric 1.0源代码笔记 之 gossip(流言算法)

7、初始化、部署并执行系统链码(scc)

代码如下:

initSysCCs() //初始化系统链码,调用scc.DeploySysCCs("")peer.Initialize(func(cid string) { //初始化所有链    scc.DeploySysCCs(cid) //按chain id部署并运行系统链码})//代码在peer/node/start.go

func Initialize(init func(string))代码如下:

func Initialize(init func(string)) {    chainInitializer = init    var cb *common.Block    var ledger ledger.PeerLedger    ledgermgmt.Initialize()    ledgerIds, err := ledgermgmt.GetLedgerIDs()    for _, cid := range ledgerIds {        ledger, err = ledgermgmt.OpenLedger(cid)        cb, err = getCurrConfigBlockFromLedger(ledger) //获取最新的配置块        err = createChain(cid, ledger, cb)        InitChain(cid) //即调用chainInitializer(cid),即scc.DeploySysCCs(cid)    }}//代码在core/peer/peer.go

createChain(cid, ledger, cb)代码如下:

func createChain(cid string, ledger ledger.PeerLedger, cb *common.Block) error {    envelopeConfig, err := utils.ExtractEnvelope(cb, 0) //获取配置交易    configtxInitializer := configtx.NewInitializer() //构造initializer    gossipEventer := service.GetGossipService().NewConfigEventer() //获取gossipServiceInstance,并构造configEventer    gossipCallbackWrapper := func(cm configtxapi.Manager) {        ac, ok := configtxInitializer.ApplicationConfig()        gossipEventer.ProcessConfigUpdate(&chainSupport{            Manager:     cm,            Application: ac,        })        //验证可疑节点身份,并关闭无效链接        service.GetGossipService().SuspectPeers(func(identity api.PeerIdentityType) bool {            return true        })    }    trustedRootsCallbackWrapper := func(cm configtxapi.Manager) {        updateTrustedRoots(cm)    }    configtxManager, err := configtx.NewManagerImpl(        envelopeConfig,        configtxInitializer,        []func(cm configtxapi.Manager){gossipCallbackWrapper, trustedRootsCallbackWrapper},    )    mspmgmt.XXXSetMSPManager(cid, configtxManager.MSPManager())    ac, ok := configtxInitializer.ApplicationConfig()    cs := &chainSupport{        Manager:     configtxManager,        Application: ac, // TODO, refactor as this is accessible through Manager        ledger:      ledger,    }    c := committer.NewLedgerCommitterReactive(ledger, txvalidator.NewTxValidator(cs), func(block *common.Block) error {        chainID, err := utils.GetChainIDFromBlock(block)        if err != nil {            return err        }        return SetCurrConfigBlock(block, chainID)    })    ordererAddresses := configtxManager.ChannelConfig().OrdererAddresses()    service.GetGossipService().InitializeChannel(cs.ChainID(), c, ordererAddresses)    chains.Lock()    defer chains.Unlock()    chains.list[cid] = &chain{        cs:        cs,        cb:        cb,        committer: c,    }    return nil}//代码在core/peer/peer.go

scc更详细内容参考:Fabric 1.0源代码笔记 之 scc(系统链码)

8、启动peerServer和ehubGrpcServer,并监控系统信号,以及启动Go自带的profiling支持进行调试

代码如下:

serve := make(chan error)sigs := make(chan os.Signal, 1)signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)go func() {    sig := <-sigs //接收系统信号    serve <- nil}go func() {    var grpcErr error    grpcErr = peerServer.Start() //启动peerServer    serve <- grpcErr}err := writePid(config.GetPath("peer.fileSystemPath")+"/peer.pid", os.Getpid()) //写入pidgo ehubGrpcServer.Start() //启动ehubGrpcServerif viper.GetBool("peer.profile.enabled") {    go func() { //启动Go自带的profiling支持进行调试        profileListenAddress := viper.GetString("peer.profile.listenAddress")        profileErr := http.ListenAndServe(profileListenAddress, nil)    }}return <-serve //等待serve//代码在peer/node/start.go

9、按配置文件重新更新模块日志级别

overrideLogModules := []string{"msp", "gossip", "ledger", "cauthdsl", "policies", "grpc"}for _, module := range overrideLogModules {    err = common.SetLogLevelFromViper(module)}flogging.SetPeerStartupModulesMap()//代码在peer/node/start.go

转载于:https://blog.51cto.com/14041296/2313009

你可能感兴趣的文章
Android自学--一篇文章基本掌握所有的常用View组件
查看>>
C语言--static的用法
查看>>
Java基础之Java并发编程:volatile关键字解析
查看>>
Web版RSS阅读器(二)——使用dTree树形加载rss订阅分组列表
查看>>
虚拟化环境下对公司业务服务器实现NLB+SQL高可用(一)
查看>>
Synology NAS 存储系统多路径连接Vmware ESXi 6.5
查看>>
群晖NAS的iSCSI设置
查看>>
Linux DHCP 中继代理
查看>>
lduan HyPer-V 简单管理(六)
查看>>
linux系统密码忘记的5个解决方法
查看>>
排序(冒泡排序,插入排序,希尔排序,选择排序,堆排序)
查看>>
PHP_010 时间日期
查看>>
基础算法题
查看>>
Nginx学习之六-nginx核心进程模型
查看>>
根据文字内容自适应的label && scrollview
查看>>
学习资源
查看>>
SQL中判断字符串中只包含或不包含某种字符的方法
查看>>
openstack运维实战系列(六)之neutron配额调整
查看>>
OpenGL基础
查看>>
绘制多边形
查看>>