--- layout: post title: flink 入门程序-wordcount category: 技术 tags: BigData keywords: description: --- {:toc} ### word count 代码 ```java public class WordCount { private static final Logger log = LoggerFactory.getLogger(WordCount.class); public static final String ZK_HOSTS = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"; /** * main. */ public static void main(String[] args) throws Exception { /***===========--------解析参数--------==================*/ ParameterTool tool = ParameterTool.fromArgs(args); String brokers = tool.get("brokers"); String topic = tool.get("topic"); Properties properties = new Properties(); String brokerServerList = brokers ;//"192.168.3.8:9092"; String firstTopic = topic; //"beam-on-flink"; String secondTopic = "beam-on-flink-res"; properties.setProperty("bootstrap.servers", brokerServerList); properties.setProperty("group.id", "consumer-flink"); properties.setProperty("zookeeper.connect",ZK_HOSTS); /***===========--------执行环境--------==================*/ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000); /***===========--------设置数据源--------==================*/ FlinkKafkaConsumer011 flinkKafkaConsumer011 = new FlinkKafkaConsumer011<>(firstTopic, new SimpleStringSchema(), properties); DataStreamSource source = env.addSource(flinkKafkaConsumer011); /***===========--------transform--------==================*/ DataStream flatMap = source.flatMap(new Splitter()).uid("2. split data"); KeyedStream keyBy = flatMap.keyBy(0); WindowedStream window = keyBy.timeWindow(Time.seconds(10)); DataStream dataStream = window.sum(1).uid("3. sum"); DataStream dateStreamRes = dataStream.map(WC::toString); /***===========--------sink to out--------==================*/ //sink 到 kafka中 sink2Kafka(brokerServerList, secondTopic, dateStreamRes); /***===========--------execute--------==================*/ env.execute("Window WordCount"); } private static void sink2Kafka(String brokerServerList, String secondTopic, DataStream dateStreamRes) { FlinkKafkaProducer011 sink2Kafka = new FlinkKafkaProducer011<>(brokerServerList,secondTopic, new SimpleStringSchema()); dateStreamRes.addSink(sink2Kafka); } public static class Splitter implements FlatMapFunction { @Override public void flatMap(String sentence, Collector out) { for (String word: sentence.split(" ")) { out.collect(new WC(word, 1)); log.info("word:{},count:{}",word,1); } } } /** * 将Tuple 替换为 Pojo对象 */ public static class WC extends Tuple2 { /** */ private String word; /** */ private Integer count; public WC() { super(); } public WC(String word, Integer count) { super(word, count); this.word = word; this.count = count; } public String getWord() { return getField(0); } public void setWord(String word) { this.word = word; setField(word,0); } public Integer getCount() { return getField(1); } public void setCount(Integer count) { this.count = count; setField(count,1); } public String toJsonString() { return JSON.toJSONString(this); } @Override public String toString() { return toJsonString(); } } } ``` ### pom 内依赖引入 ```xml com.alibaba fastjson 1.2.55 org.apache.flink flink-jdbc_2.12 1.7.0 org.apache.flink flink-connector-wikiedits_2.12 1.7.0 dependency> org.apache.flink flink-java ${flink.version} org.apache.flink flink-clients_${scala.binary.version} ${flink.version} org.apache.flink flink-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-cep_${scala.binary.version} ${flink.version} org.apache.flink flink-cep-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-gelly_${scala.binary.version} ${flink.version} org.apache.flink flink-table_${scala.binary.version} ${flink.version} org.apache.flink flink-gelly-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-gelly-examples_${scala.binary.version} ${flink.version} org.apache.flink flink-connector-kafka-0.11_${scala.binary.version} ${flink.version} org.apache.flink flink-connector-elasticsearch2_${scala.binary.version} ${flink.version} org.apache.flink flink-test-utils-junit ${flink.version} org.apache.flink flink-streaming-java_2.11 ${flink.version} test test-jar org.mockito mockito-all 1.10.19 jar test org.apache.flink flink-runtime_2.11 ${flink.version} test test-jar joda-time joda-time 2.7 org.apache.commons commons-math3 3.5 org.influxdb influxdb-java 2.3 junit junit ${junit.version} org.apache.maven.plugins maven-shade-plugin 2.4.1 package shade ``` ### 编译代码 `mvn clean package` ### 上传代码到 flink 平台 ![flink-web-submit](//raw.githubusercontent.com/George5814/blog-pic/master/image/flink/flink-web-submit.png) ![flink-web-submit-2](//raw.githubusercontent.com/George5814/blog-pic/master/image/flink/flink-web-submit-2.png) ### flink 平台执行监控 ![flink-job-running](//raw.githubusercontent.com/George5814/blog-pic/master/image/flink/flink-job-running.png)