hadoopHDFS-3
/var/hadoop/dfs/name
我们解决问题的时候,不要依托于一个更复杂的方案去解决一个现有的简单问题
对于大量数据处理要 读缓存 写队列
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务
ZooKeeper是用来保证数据在集群间的事务性一致
(如 前500名 2折 在集群高并发下怎么确定先后顺序?)
zookeeper 角色与特性
Leader:
接受所有Follower的提案请求并统一协调发起提案的投票,负责与所有的Follower进行内部的数据交换
Follower:
直接为客户端服务并参与提案的投票,同时与Leader进行数据交换
Observer:
直接为客户端服务但并不参与提案的投票,同时也与Leader进行数据交换zookeeper
zookeeper 角色与选举
服务在启动的时候是没有角色的 (LOOKING)
角色是通过选举产生的
选举产生一个 leader,剩下的是 follower
选举 leader 原则:
集群中超过半数机器投票选择leader.
假如集群中拥有n台服务器,那么leader必须得到n/2+1台服务器投票
如果 leader 死亡,从新选举 leader
如果死亡的机器数量达到一半,集群挂起
如果无法得到足够的投票数量,就重新发起投票,如果参与投票的机器不足 n/2+1 集群停止工作
如果 follower 死亡过多,剩余机器不足 n/2+1 集群(一般组集群组奇数不组偶数台,防止对半现象,增加冗余)也会停止工作
observer 不计算在投票总设备数量里面
zookeeper 可伸缩扩展性原理与设计
leader 所有写相关操作
follower 读操作与响应leader提议在Observer出现以前,ZooKeeper的伸缩性由Follower来实现,我们可以通过添加Follower节点的数量来保证ZooKeeper服务的读性能。但是随着Follower节点数量的增加,ZooKeeper服务的写性能受到了影响。为什么会出现这种情况?在此,我们需要首先了解一下这个"ZK服务"是如何工作的。zookeeper
zookeeper 可伸缩扩展性原理与设计
客户端提交一个请求,若是读请求,则由每台Server的本地副本数据库直接响应。若是写请求,需要通过一致性协议(Zab)来处理
Zab协议规定:来自Client的所有写请求,都要转发给ZK服务中唯一的Leader,由Leader根据该请求发起一个Proposal。然后,其他的Server对该Proposal进行Vote。之后,Leader对Vote进行收集,当Vote数量过半时Leader会向所有的Server发送一个通知消息。最后,当Client所连接的Server收到该消息时,会把该操作更新到内存中并对Client的写请求做出回应
ZooKeeper 服务器在上述协议中实际扮演了两个职能。它们一方面从客户端接受连接与操作请求,另一方面对操作结果进行投票。这两个职能在 ZooKeeper集群扩展的时候彼此制约
从Zab协议对写请求的处理过程中我们可以发现,增加follower的数量,则增加了对协议中投票过程的压力。因为Leader节点必须等待集群中过半Server响应投票,于是节点的增加使得部分计算机运行较慢,从而拖慢整个投票过程的可能性也随之提高,随着集群变大,写操作也会随之下降
所以,我们不得不,在增加Client数量的期望和我们希望保持较好吞吐性能的期望间进行权衡。要打破这一耦合关系,我们引入了不参与投票的服务器,称为Observer。 Observer可以接受客户端的连接,并将写请求转发给Leader节点。但是,Leader节点不会要求 Observer参加投票。相反,Observer不参与投票过程,仅仅在上述第3歩那样,和其他服务节点一起得到投票结果
Observer的扩展,给 ZooKeeper 的可伸缩性带来了全新的景象。我们现在可以加入很多 Observer 节点,而无须担心严重影响写吞吐量。但他并非是无懈可击的,因为协议中的通知阶段,仍然与服务器的数量呈线性关系。但是,这里的串行开销非常低。因此,我们可以认为在通知服务器阶段的开销不会成为瓶颈
Observer提升读性能的可伸缩性
Observer提供了广域网能力
zookeeper 角色,选举
leader 集群主节点
follower 参与选举的附属节点
observer 不参与选举的节点,同步 leader 的命名空间
zookeeper 安装 2181端口
新建4台主机
1 禁用防火墙和 selinux
2 设置 /etc/hosts ip 主机名对应关系
3 安装 openjdk
ZK 集群的安装配置
1、安装 openjdk 环境
2、解压 创建配置文件
3、设置集群机器 id、ip、port
4、拷贝分发到所有集群节点
5、创建目录和 myid 文件
6、启动服务
7、查看状态
2.解压 创建配置文件
#tar -xf /zookeeper-3.4.10.tar.gz
#mv zookeeper-3.4.10 /usr/local/zookeeper
#cd /usr/local/zookeeper
拷贝配置文件
#cp conf/zoo_sample.cfg conf/zoo.cfg
修改配置文件
#vim conf/zoo.cfg #添加
...
server.1=zk1:2888:3888 #主机名:端口号:端口范围
server.2=zk2:2888:3888
server.3=zk3:2888:3888
server.4=zk4:2888:3888:observer
再一台上配置完成后 将zookeeper同步到所有其他主机
#srync -azSH -delete .......
3.分别在每台 创建目录 zookeeper 配置文件里面的 dataDir 指定的目录
#mkdir /tmp/zookeeper #数据存放的目录
4 分别在每台 指定目录下创建 myid 文件,写入自己的 id 值 id的范围是1~255
server.id 中的 id 与 myid 中的 id 必须一致
zk1: # echo 1 > /tmp/zookeeper/myid
zk2: # echo 2 > /tmp/zookeeper/myid
...
5 启动集群 在所有集群节点执行
#/usr/local/zookeeper/bin/zkServer.sh start
#for i in zk{1..4};do ssh ${i} /usr/local/zookeeper/bin/zkServer.sh start; done
#jps
查看角色 zookeeper管理(4字命令)
#/usr/local/zookeeper/bin/zkServer.sh status #本地查看本机
#{ echo 'stat';sleep 1; }|telnet 192.168.4.10 2181 #远程查看指定
#这里echo输出后会关相当于关闭标准输入 进程也会关闭,下面正规写法
#{ echo 'stat';yes; }|telnet 192.168.4.10 2181 #交互式
#or#
#vim a.sh #zookeeper查看 管理 脚本,非交互式
function zkstatus(){
exec 9<>/dev/tcp/$1/2181 #9<>文件描述符(zookeeper)
echo "$2" >&9 #将输出 输入到目标端口
cat <&9 #将目标端口的输出 输入到cat后输出
exec 9<&- #关闭链接
}
for i in master node{1..3}
do
echo -ne "${i}\t"
zkstatus ${i} stat | grep -P "^Mode"
done
zkstatus node2 ruok
echo
#chmod 755 a.sh
#./a.sh
stat 查看状态信息
ruok命令 监测状态
这里各种 zookeeper交互 命令(4字命令)去官网看手册
Http://www.apache.org/ -> zookeeper
http://zookeeper.apache.org/ -> Documentation(选择对应版本) -> Administrator's Guide -> ZooKeeper Commands: The Four Letter Words
kafka是什么?
Kafka是由LinkedIn开发的一个分布式的消息系统
kafka是使用Scala编写
kafka是一种消息中间件
为什么要使用 kafka
解耦、冗余、提高扩展性、缓冲
保证顺序,灵活,削峰填谷
异步通信
异步: 生产者 生产信息到 kafka里(消息队列) 消费者提取kafka里信息来处理
(同步是生产者生产消息等消费者处理)
kafka 角色与集群结构
producer:生产者,负责发布消息
consumer:消费者,负责读取处理消息
topic:消息的类别
Parition:每个Topic包含一个或多个Partition.
Broker:Kafka集群包含一个或多个服务器
Kafka通过Zookeeper管理集群配置,选举leader
kafka 集群的安装配置
kafka集群的安装配置是依赖 zookeeper的,搭建
kafka 集群之前,首先必须创建好一个可用 zookeeper(保证kafka集群数据一致) 如果不做zookeeper,kafka就是单台的无法组建集群
kafka 集群的安装配置
kafka 集群的安装配置是依赖 zookeeper的,搭建
kafka 集群之前,首先请创建好一个可用 zookeeper集群
安装 openjdk 运行环境
分发 kafka 拷贝到所有集群主机
修改配置文件
启动与验证
kafka 集群安装
1 禁用防火墙和 selinux
2 设置 /etc/hosts ip 主机名对应关系
192.168.4.11 zk1
192.168.4.12 zk2
192.168.4.13 zk3
192.168.4.14 zk4
3 安装 openjdk
4 安装 kafka 到 /usr/local/kafka
#tar -xf /kafka_2.10-0.10.2.1.tgz
#mv kafka_2.10-0.10.2.1/ /usr/local/kafka
5 修改配置文件 config/server.properties
#vim config/server.properties
broker.id= id #值不能相同 一台一个
zookeeper.connect=zk1:2181, zk2:2181, zk3:2181 #集群地址,不用都列出,写一部分可靠的即可
log.dirs=/tmp/kafka-logs #kafka的数据目录
同步安装机配置文件 注意修改broler.id不同
6 在所有主机启动服务kafka
#/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties #daemon后台启动
7 验证:
jps 能看到 kafka 模块
netstat 能看到 9092 被监听
集群验证与消息发布
创建topic (在zk3)
#./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mymsg
查看所有 topic (在zk2)也能看到
#bin/kafka-topics.sh --list --zookeeper localhost:2181
查看刚刚创建的 topic(在zk1)详细信息
#bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic mymsg
在两个终端里面,生产者发布消息,消费者读取消息
生产者发布信息zk1
#bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mymsg
消费者消费信息zk2 zk3
#bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mymsg --from-beginning
#bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mymsg
from-beginning 是从头开始消费消息
注意 下面最好 重新创建主机
hadoop 高可用
NameNode 高可用
为什么 NameNode 需要高可用
NameNode 是 hdfs 的核心配置,HDFS 又是Hadoop 的核心组件,NameNode 在 Hadoop 集群中至关重要,NameNode机器宕机,将导致集群不可用,如果NameNode 数据丢失将导致整个集群的数据丢失,而 NameNode 的数据的更新又比较频繁,实现 NameNode 高可用势在必行
官方提供了两种解决方案
HDFS with NFS
HDFS with QJM
两种翻案异同
NFS QJM
NN NN
ZK ZK
ZKFailoverController ZKFailoverController
NFS JournalNode
HA 方案对比
都能实现热备
都是一个active NN 和一个 standby NN
都使用Zookeeper 和 ZKFC 来实现自动失效恢复
失效切换都使用 fencing 配置的方法来 active NN
NFS 数据数据共享变更方案把数据存储在共享存储里面,我们还需要考虑 NFS 的高可用设计
(市面上常用的 共享存储高可靠用 DRBD+hearbeat(这里不使用),它对缓存无能为力)
QJM 不需要共享存储,但需要让每一个 DN 都知道两个 NN 的位置,并把块信息和心跳包发送给active和standby这两个 NN
NameNode 高可用方案 (QJM)
为了解决 NameNode 单点故障问题,Hadoop 给出了 HDFS 的高可用HA方案:HDFS 通常由两个NameNode组成,一个处于 active 状态,另一个处于standby 状态。Active NameNode对外提供服务,比如处理来自客户端的 rpc 请求,而 StandbyNameNode 则不对外提供服务,仅同步 ActiveNameNode 的状态,以便能够在它失败时进行切换。
NameNode 高可用架构
一个典型的HA集群,NameNode会被配置在两台独立的机器上,在任何时间上,一个NameNode处于活动状态,而另一个NameNode处于备份状态,活动状态的NameNode会响应集群中所有的客户端,备份状态的NameNode只是作为一个副本,保证在必要的时候提供一个快速的转移。为了让Standby Node与Active Node保持同步,这两个Node都与一组称为JNS的互相独立的进程保持通信(Journal Nodes)。当Active Node上更新了namespace,它将记录修改日志发送给JNS的多数派。Standby noes将会从JNS中读取这些edits,并持续关注它们对日志的变更。Standby Node将日志变更应用在自己的namespace中,当failover发生时,Standby将会在提升自己为Active之前,确保能够从JNS中读取所有的edits,即在failover发生之前Standy持有的namespace应该与Active保持完全同步。
NameNode 更新是很频繁的,为了的保持主备数据的一致性,为了支持快速failover,Standby node持有集群中blocks的最新位置是非常必要的。为了达到这一目的,DataNodes上需要同时配置这两个Namenode的地址,同时和它们都建立心跳链接,并把block位置发送给它们
还有一点非常重要,任何时刻,只能有一个ActiveNameNode,否则将会导致集群操作的混乱,那么两个NameNode将会分别有两种不同的数据状态,可能会导致数据丢失,或者状态异常,这种情况通常称为“split-brain”(脑裂,三节点通讯阻断,即集群中不同的Datanode 看到了不同的Active NameNodes)。对于JNS而言,任何时候只允许一个NameNode作为writer;在failover期间,原来的Standby Node将会接管Active的所有职能,并负责向JNS写入日志记录,这中机制阻止了其他NameNode基于处于Active状态的问题。
系统规划
192.168.4.10m01 NameNode1 Hadoop
192.168.4.20m02 NameNode2 Hadoop
192.168.4.11node1 DataNode Zookeeper HDFS Zookeeper
192.168.4.12node2 DataNode Zookeeper HDFS Zookeeper
192.168.4.13node3 DataNode Zookeeper HDFS Zookeeper
将Journal Nodes(namenode间用于同步,应为要防止namenode挂掉,所以不要创建在namenode上,可另外组建) 这里分配在node{1..3}
secondarynamenode 在高可用里面没有用途
1.准备一个可用的zookeeper集群,可以使用另外的机器 这里利用node{1..3}做
2.禁用防火墙和 selinux 设置 /etc/hosts ip 主机名对应关系 ssh免密码 java环境
#vim /etc/hosts
192.168.4.10 m01
192.168.4.20 m02
192.168.4.11 node1
192.168.4.12 node2
192.168.4.13 node3
注意 之前有/var/hadoop文件 要清空
4 配置
core-site.xml
#vim etc/hadoop/core-site.xml
<configuration>
<property>
<name>fs.defaultFS</name> #定义namenode组的地址
<value>hdfs://mycluster</value> #定义一个组
</property>
<property>
<name>hadoop.tmp.dir</name> #指定使用hadoop时产生文件的存放目录
<value>/var/hadoop</value>
</property>
<property>
<name>ha.zookeeper.quorum</name> #定义zookeeper集群位置
<value>node1:2181,node2:2181,node3:2181</value>
</property> #多写几即可台做冗余
</configuration>
hdfs-site.xml
#vim etc/hadoop/hdfs-site.xml
<configuration>
<property>
<name>dfs.replication</name> #指定hdfs保存数据的副本数量
<value>2</value>
</property>
<property>
<name>dfs.nameservices</name>
<value>mycluster</value> #指定为前面定义的namenode组
</property>
<property>
<name>dfs.ha.namenodes.mycluster</name>
<value>nn1,nn2</value> #指定namenode组成员有谁
</property>
<property> #rpc通信的端口
<name>dfs.namenode.rpc-address.mycluster.nn1</name>
<value>m01:8020</value> #定义组成员指向的真实主机
</property>
<property>
<name>dfs.namenode.rpc-address.mycluster.nn2</name>
<value>m02:8020</value>
</property>
<property> #http通信端口
<name>dfs.namenode.http-address.mycluster.nn1</name>
<value>m01:50070</value>
</property>
<property>
<name>dfs.namenode.http-address.mycluster.nn2</name>
<value>m02:50070</value>
</property>
<property> #指定namenode元数据存储在journalnode中的路径
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://node1:8485;node2:8485;node3:8485/mycluster</value>
</property> #全部声明
<property> #指定journalnode日志文件存储的路径
<name>dfs.journalnode.edits.dir</name>
<value>/var/hadoop/journal</value>
</property>
<property> #指定HDFS客户端故障切换active namenode的java类
<name>dfs.client.failover.proxy.provider.mycluster</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<property> #配置隔离机制为 ssh 通过namenode间使用ssh通信
<name>dfs.ha.fencing.methods</name>
<value>sshfence</value>
</property>
<property> #指定秘钥的位置 (免密登录)
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/root/.ssh/id_rsa</value>
</property>
<property> #开启自动故障转移
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
</configuration>
同步hadoop到所有集群机器
#for i in master{1..2} node{1..3};do ssh ${i} rm -rf /var/hadoop/*; done #如果第一次新做可以省去
#cat rsyncfile.sh
#!/bin/bash
for i in node{1..3} master2
do
rsync -avSH --delete /usr/local/hadoop/etc/hadoop/ ${i}:/usr/local/hadoop/etc/hadoop/ -e 'ssh'
wait
done
初始化集群:
1 在其中一台初始化 zookeeper 集群 在m01(namenode其中一台就可)
#bin/hdfs zkfc -fORMatZK
看到Successfully即成功
2 在定义的节点启动 journalnode(这里是node{1..3})
#sbin/hadoop-daemon.sh start journalnode
3 在其中一台 namenode 上执行格式化命令m01
#bin/hdfs namenode -format
看到Successfully成功即可
4 格式化以后把数据目录拷贝到另一台 namenode m02
#cd /var/hadoop/dfs
#tree .
#rsync -azSH --delete m01:/var/hadoop/dfs /var/hadoop/***目录的来源???
5 初始化 JournalNode 在m01
#./bin/hdfs namenode -initializeSharedEdits
6 先分别停止 JournalNode 服务 node{1..3}
因为初始化他会自动启动 而再启动集群会再启动一遍 如果已启动 集群会报错
#sbin/hadoop-daemon.sh stop journalnode
7 启动 dfs
#./sbin/start-dfs.sh
验证配置
#bin/hadoop dfsadmin -report
查看集群状态
#bin/hdfs haadmin -getServiceState nn1 #active
#bin/hdfs haadmin -getServiceState nn2 #standby
#bin/hadoop fs -ls hdfs://mycluster/
#bin/hadoop fs -mkdir hdfs://mycluster/input
http://192.168.4.10:50070
http://192.168.4.20:50070
验证高可用,关闭 active namenode
#sbin/hadoop-daemon.sh stop namenode
#bin/hdfs haadmin -getServiceState nn2
变为active
#sbin/hadoop-daemon.sh stop namenode
变为standby
#vim etc/hadoop/yarn-site.xml
<configuration>
<!-- Site specific YARN configuration properties -->
<property> #NodeManager上运行的计算框架(附属服务)。需配置成mapReduce_shuffle,才可运行MapReduce程序
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>m01</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>m02</value>
</property>
<property>
<name>yarn.resourcemanager.recovery.enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.resourcemanager.store.class</name> #定义存储的类
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>node1:2181,node2:2181,node3:2181</value>
</property>
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>yarn-ha</value>
</property>
</configuration>
将配置同步到集群
启动服务,检查状态
[root@m01 hadoop]# sbin/start-yarn.sh
[root@m01 hadoop]# bin/yarn rmadmin -getServiceState rm1 #active
[root@m01 hadoop]# bin/yarn rmadmin -getServiceState rm2 #standby
[root@m01 hadoop]# bin/yarn node -list
[root@m01 hadoop]# ssh m02
[root@m02 hadoop]# ./sbin/yarn-daemon.sh start resourcemanager
# curl -i http://m01:8088/
# curl -i http://m02:8088/ #会重定向到m01(active)上
验证
[root@m01 hadoop]# sbin/yarn-daemon.sh stop resourcemanager
[root@m01 hadoop]# bin/yarn rmadmin -getServiceState rm2
此时会变成active
[root@m01 hadoop]# sbin/yarn-daemon.sh start resourcemanager
[root@m01 hadoop]# bin/yarn rmadmin -getServiceState rm1
此时会变成standby
注意 做恢复时 对于/var/hadoop这个文件要从正常机器上考一份过来进行恢复 不能做初始化****
对与mapred-site.xml的参数
mapreduce.jobhistory.address
mapreduce.jobhistory.WEBapp.address
是否也有类似的高可用配置?
相关文章