分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 技术分享

网站访问量实时统计

发布时间:2023-09-06 02:27责任编辑:苏小强关键词:暂无标签

一、需求:统计网站访问量(实时统计)

技术选型:特点(数据量大、做计算、实时)实时流式计算框架:storm1)spout数据源,接入数据源本地文件2)splitbolt业务逻辑处理切分数据拿到网址3)bolt累加次数求和

1、PvCountSpout类

package com.demo.pvcount;import java.io.BufferedReader;import java.io.FileInputStream;import java.io.FileNotFoundException;import java.io.IOException;import java.io.InputStreamReader;import java.util.Map;import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.IRichSpout;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Values;public class PvCountSpout implements IRichSpout{ ???private SpoutOutputCollector collector; ???private BufferedReader br; ???private String line; ???????@Override ???public void nextTuple() { ???????//发送读取的数据的每一行 ???????try { ???????????while((line = br.readLine())!= null) { ???????????????//发送数据到splitbolt ???????????????collector.emit(new Values(line)); ???????????????//设置延迟 ???????????????Thread.sleep(500); ???????????} ???????} catch (IOException e) { ???????????e.printStackTrace(); ???????} catch (InterruptedException e) { ???????????e.printStackTrace(); ???????} ???} ???@Override ???public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) { ???????this.collector = collector; ???????????????//读取文件 ???????try { ???????????br = new BufferedReader(new InputStreamReader(new FileInputStream("e:/weblog.log"))); ???????} catch (FileNotFoundException e) { ???????????e.printStackTrace(); ???????} ???} ???@Override ???public void declareOutputFields(OutputFieldsDeclarer declarer) { ???????//声明 ???????declarer.declare(new Fields("logs")); ???} ???????//处理tuple成功 回调的方法 ???@Override ???public void ack(Object arg0) { ???} ???//如果spout在失效的模式中 调用此方法来激活 ???@Override ???public void activate() { ???} ???//在spout程序关闭前执行 不能保证一定被执行 kill -9 是不执行 storm kill 是不执行 ???@Override ???public void close() { ???} ???//在spout失效期间,nextTuple不会被调用 ???@Override ???public void deactivate() { ???} ???//处理tuple失败回调的方法 ???@Override ???public void fail(Object arg0) { ???} ???//配置 ???@Override ???public Map<String, Object> getComponentConfiguration() { ???????return null; ???}}

2、PvCountSplitBolt类

package com.demo.pvcount;import java.util.Map;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.IRichBolt;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;public class PvCountSplitBolt implements IRichBolt{ ???private OutputCollector collector; ???????//一个bolt即将关闭时调用 不能保证一定被调用 资源清理 ???@Override ???public void cleanup() { ???} ???private int pvnum = 0; ???//业务逻辑 分布式 集群 并发度 线程 (接收tuple然后进行处理) ???@Override ???public void execute(Tuple input) { ???????//1.获取数据 ???????String line = input.getStringByField("logs"); ???????????????//2.切分数据 ???????String[] fields = line.split("\t"); ???????String session_id = fields[1]; ???????????????//3.局部累加 ???????if (session_id != null) { ???????????//累加 ???????????pvnum++; ???????????//输出 ???????????collector.emit(new Values(Thread.currentThread().getId(),pvnum)); ???????} ???} ???//初始化调用 ???@Override ???public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) { ???????this.collector = collector; ???} ???//声明 ???@Override ???public void declareOutputFields(OutputFieldsDeclarer declarer) { ???????//声明输出 ???????declarer.declare(new Fields("threadid","pvnum")); ???} ???//配置 ???@Override ???public Map<String, Object> getComponentConfiguration() { ???????return null; ???}}

3、PvCountSumBolt类

package com.demo.pvcount;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.IRichBolt;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.tuple.Tuple;public class PvCountSumBolt implements IRichBolt{ ???private HashMap<Long, Integer> hashMap = new HashMap<>(); ???????@Override ???public void cleanup() { ???} ???//全局累加求和 业务逻辑 ???@Override ???public void execute(Tuple input) { ???????//1.获取数据 ???????Long threadid = input.getLongByField("threadid"); ???????Integer pvnum = input.getIntegerByField("pvnum"); ???????????????//2.创建集合 存储(threadid,pvnum) 15 20 ???????hashMap.put(threadid, pvnum); ???????????????//3.累加求和(拿到集合中所有value值) ???????Iterator<Integer> iterator = hashMap.values().iterator(); ???????????????//4.清空之前的数据 ???????int sumnum = 0; ???????while (iterator.hasNext()) { ???????????sumnum += iterator.next(); ???????} ???????????????System.err.println(Thread.currentThread().getName() + "总访问量为->" + sumnum); ???} ???@Override ???public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) { ???} ???@Override ???public void declareOutputFields(OutputFieldsDeclarer arg0) { ???} ???@Override ???public Map<String, Object> getComponentConfiguration() { ???????return null; ???}}

4、PvCountDriver类

package com.demo.pvcount;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.tuple.Fields;public class PvCountDriver { ???public static void main(String[] args) { ???????// 1.hadoop->Job storm->topology 创建拓扑 ???????TopologyBuilder builder = new TopologyBuilder(); ???????// 2.指定设置 ???????builder.setSpout("PvCountSpout", new PvCountSpout(), 1); ???????builder.setBolt("PvCountSplitBolt", new PvCountSplitBolt(), 6).setNumTasks(4) ???????????????.fieldsGrouping("PvCountSpout", new Fields("logs")); ???????builder.setBolt("PvCountSumBolt", new PvCountSumBolt(), 1).fieldsGrouping("PvCountSplitBolt", new Fields("pvnum")); ???????// 3.创建配置信息 ???????Config conf = new Config(); ???????conf.setNumWorkers(2); ???????// 4.提交任务 ???????LocalCluster localCluster = new LocalCluster(); ???????localCluster.submitTopology("pvcounttopology", conf, builder.createTopology()); ???}}

5、PvCountDriver_Shuffle类

package com.demo.pvcount;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.topology.TopologyBuilder;public class PvCountDriver_Shuffle { ???public static void main(String[] args) { ???????// 1.hadoop->Job storm->topology 创建拓扑 ???????TopologyBuilder builder = new TopologyBuilder(); ???????// 2.指定设置 ???????builder.setSpout("PvCountSpout", new PvCountSpout(), 1); ???????builder.setBolt("PvCountSplitBolt", new PvCountSplitBolt(), 6).setNumTasks(4) ???????????????.shuffleGrouping("PvCountSpout"); ???????builder.setBolt("PvCountSumBolt", new PvCountSumBolt(), 2).shuffleGrouping("PvCountSplitBolt"); ???????// 3.创建配置信息 ???????Config conf = new Config(); ???????conf.setNumWorkers(2); ???????// 4.提交任务 ???????LocalCluster localCluster = new LocalCluster(); ???????localCluster.submitTopology("pvcounttopology", conf, builder.createTopology()); ???}}

6、weblog.log文件

storm.apache.org ???EEH6Y21245GHI899OFG4V9U567 ???2018-08-07 10:40:49storm.apache.org ???VVVYH6Y4V4SFXZWWEQRQWEQ ???2018-08-07 08:40:50storm.apache.org ???BBYH61456DEL89RG5VV9UYU7 ???2018-08-07 10:40:49storm.apache.org ???EEH6Y21245GHI899OFG4V9U567 ???2018-08-07 09:40:49storm.apache.org ???CCYH6Y4V4SCVXTG6DPB4VH9U123 ???2018-08-07 10:40:49storm.apache.org ???CCYH6Y4V4SCVXTG6DPB4VH9U123 ???2018-08-07 12:40:49storm.apache.org ???VVVYH6Y4V4SFXZWWEQRQWEQ ???2018-08-07 08:40:52storm.apache.org ???CCYH6Y4V4SCVXTG6DPB4VH9U123 ???2018-08-07 08:40:50storm.apache.org ???VVVYH6Y4V4SFXZWWEQRQWEQ ???2018-08-07 09:40:49...
...
...storm.apache.org ???EEH6Y21245GHI899OFG4V9U567 ???2018-08-07 08:40:53storm.apache.org ???BBYH61456DEL89RG5VV9UYU7 ???2018-08-07 12:40:49storm.apache.org ???EEH6Y21245GHI899OFG4V9U567 ???2018-08-07 08:40:51storm.apache.org ???EEH6Y21245GHI899OFG4V9U567 ???2018-08-07 10:40:49storm.apache.org ???HUNTERH6YCGFJYERTT834R52FDXV9U34 ???2018-08-07 08:40:53storm.apache.org ???BBYH61456DEL89RG5VV9UYU7 ???2018-08-07 08:40:50storm.apache.org ???EEH6Y21245GHI899OFG4V9U567 ???2018-08-07 08:40:53storm.apache.org ???VVVYH6Y4V4SFXZWWEQRQWEQ ???2018-08-07 10:40:49

7、运行(4)中的main方法,控制台显示如下图:

此时在weblog.log文件中增加几条数据,则总访问量相应增加几条。

至此,简单实现了网站访问量实时统计。

网站访问量实时统计

原文地址:https://www.cnblogs.com/areyouready/p/10188300.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved