package basic import ( "github.com/armen/dp/consensus/paxos" "github.com/armen/dp/link" ) func (n *Node) initProp() { // Setup the pp2p.Deliver event dispatcher n.pp2p.Deliver(func(p link.Peer, m link.Message) { n.mux <- func() { switch msg := m.(type) { case paxos.Promise: n.onPromise(msg.Ballot, msg.Accepted, msg.Val) case paxos.Accepted: n.onAccepted(msg.Ballot, msg.Val) case paxos.Nack: n.onNack(msg.Ballot) } } }) } func (n *Node) onPropose() { if n.decided { return } n.ts++ n.numOfAccepts = 0 n.promises = nil n.vals = make(map[*paxos.Ballot]interface{}) go n.beb.Broadcast(paxos.Prepare{&paxos.Ballot{n.ts, n.ID()}}) } // Propose proposes value v for consensus func (n *Node) Propose(v interface{}) { n.mux <- func() { n.propVal = v n.onPropose() } } func (n *Node) onPromise(ballot *paxos.Ballot, accepted *paxos.Ballot, val interface{}) { if ballot.Ts != n.ts || ballot.Pid != n.ID() { return } n.vals[accepted] = val n.promises = append(n.promises, accepted) if len(n.promises) != (n.N()+1)/2 { return } maxBallot := n.promises[0] if val, ok := n.vals[maxBallot]; ok && val != nil { n.propVal = val } b := &paxos.Ballot{n.ts, n.ID()} go n.beb.Broadcast(paxos.Accept{b, n.propVal}) } func (n *Node) onAccepted(ballot *paxos.Ballot, val interface{}) { if ballot.Ts != n.ts || ballot.Pid != n.ID() { return } n.numOfAccepts++ if n.numOfAccepts != (n.N()+1)/2 { return } go n.beb.Broadcast(paxos.Decided{ballot, val}) } func (n *Node) onNack(ballot *paxos.Ballot) { if ballot.Ts == n.ts && ballot.Pid == n.ID() { n.onPropose() } }