1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
| package main
import ( "fmt" "net/http" "nsq_client/config" "sync" "time"
"github.com/nsqio/go-nsq" )
var ServerConsumerHandler = &ServerConsumer{DataServerAddrs: make(map[string]time.Time)}
type ServerConsumer struct { DataServerAddrs map[string]time.Time rwLocker sync.RWMutex }
func NewConsumer(topic string, chanName string, h nsq.Handler) (consumer *nsq.Consumer, err error) { if consumer, err = nsq.NewConsumer(topic, chanName, nsq.NewConfig()); err != nil { return nil, err } consumer.ChangeMaxInFlight(3) consumer.AddHandler(h) err = consumer.ConnectToNSQLookupd(Conf.NSQ_LOOKUPD_ADDR) if err != nil { return nil, err } return consumer, nil }
func (h *ServerConsumer) HandleMessage(message *nsq.Message) error { if dataServer := string(message.Body); dataServer != "" { h.rwLocker.Lock() h.DataServerAddrs[dataServer] = time.Now() h.rwLocker.Unlock() } return nil }
func MonitorDataServerAddrs() { consumer, err := NewConsumer("data_server_addr", "ch1", ServerConsumerHandler) if err != nil { panic(err) } err = consumer.ConnectToNSQLookupd(Conf.NSQ_LOOKUPD_ADDR) if err != nil { panic(err) } }
func (h *ServerConsumer) removeExpireDatasServers() { for { h.rwLocker.Lock() for dataServer, t := range h.DataServerAddrs { if t.Add(10 * time.Second).Before(time.Now()) { delete(h.DataServerAddrs, dataServer) } } h.rwLocker.Unlock() time.Sleep(2 * time.Second) } }
var Conf *config.Cfgparams
func main() {
Conf = config.GetConfig() fmt.Println(Conf)
server := http.Server{ Addr: "127.0.0.1:9000", } MonitorDataServerAddrs() if err := server.ListenAndServe(); err != nil { panic(err) } }
|