丸趣 TV 小编给大家分享一下 HDFS 中 FileSystem 是什么类,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
首先来看一下, FileSystem(org.apache.hadoop.fs.FileSystem), 这是一个抽象类, 是所有文件系统的父类.
而我们要从 HDFS(Hadoop Distributed FileSystem)下载数据, 应该获取一个 DistributedFileSystem 的实例, 那么如何获取一个 DistributedFileSystem 的实例呢?
FileSystem fs = FileSystem.get(new Configuration());
在 FileSystem 中有 3 个重载的 get()方法
// 1. 通过配置文件获取一个 FileSystem 实例
public static FileSystem get(Configuration conf)
// 2. 通过指定的 FileSystem 的 URI, 配置文件获取一个 FileSystem 实例
public static FileSystem get(URI uri, Configuration conf)
// 3. 通过指定的 FileSystem 的 URI, 配置文件, FileSystem 用户名获取一个 FileSystem 实例
public static FileSystem get(final URI uri, final Configuration conf, final String user)
先调用 FileSystem.get(Configuration conf)方法, 再调用重载方法 FileSystem.get(URI uri, Configuration conf)
public static FileSystem get(URI uri, Configuration conf) throws IOException {
// schem 是 FileSystem 具体的 URI 方案如: file, hdfs, Webhdfs, har 等等
String scheme = uri.getScheme(); // scheme = hdfs
// authority 是 NameNode 的主机名, 端口号
String authority = uri.getAuthority(); // authority = node1:9000
...
// disableCacheName = fs.hdfs.impl.disable.cache
String disableCacheName = String.format(fs.%s.impl.disable.cache , scheme);
// 读取配置文件, 判断是否禁用缓存
if (conf.getBoolean(disableCacheName, false)) { // 若禁用缓存
return createFileSystem(uri, conf); // 直接调用创建 FileSystem 实例的方法
}
// 不禁用缓存, 先从 FileSystem 的静态成员变量 CACHE 中获取 FileSystem 实例
return CACHE.get(uri, conf);
}
再调用 FileSystem$Cache.get(URI uri, Configuration conf)方法(Cache 是 FileSystem 的静态内部类)
FileSystem get(URI uri, Configuration conf) throws IOException{ Key key = new Key(uri, conf); // key = (root (auth:SIMPLE))@hdfs://node1:9000
return getInternal(uri, conf, key);
}
再调用 FileSystem$Cache.getInternal(URI uri, Configuration conf, FileSystem$Cache$Key key)方法(Key 又是 Cache 的静态内部类)
private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
FileSystem fs;
synchronized (this) {
// map 是 Cache 中用来缓存 FileSystem 实例的成员变量, 其类型为 HashMap Key, FileSystem
fs = map.get(key);
}
if (fs != null) { // 如果从缓存 map 中获取到了相应的 FileSystem 实例
return fs; // 则返回这个实例
}
// 否则, 调用 FileSystem.createFileSystem(URI uri, Configuration conf)方法, 创建 FileSystem 实例
fs = createFileSystem(uri, conf);
/* 分割线 1, 期待着 createFileSystem()方法的返回 */
synchronized (this) { // refetch the lock again
/*
* 在多线程环境下, 可能另一个客户端 (另一个线程) 创建好了一个 DistributedFileSystem 实例, 并缓存到了 map 中
* 所以, 这时候就把当前客户端新创建的 DistributedFileSystem 实例注销
* 其实这是一个特殊的单例模式, 一个 key 映射一个 DistributedFileSystem 实例
*/
FileSystem oldfs = map.get(key);
if (oldfs != null) { // a file system is created while lock is releasing
fs.close(); // close the new file system
return oldfs; // return the old file system
}
/*
* now insert the new file system into the map
* 缓存当前新创建的 DistributedFileSystem 实例到 map 中
*/
fs.key = key;
map.put(key, fs);
...
return fs;
}
}
来自分割线 1, 先调用 FileSystem.createFileSystem(URI uri, Configuration conf)方法
private static FileSystem createFileSystem(URI uri, Configuration conf
) throws IOException {
// 通过读取配置文件, 获取 FileSystem 具体的 URI 模式: hdfs 的类对象
Class ? clazz = getFileSystemClass(uri.getScheme(), conf); // clazz = org.apache.hadoop.hdfs.DistributedFileSystem
...
// 反射出一个 DistributedFileSystem 实例
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
// 对 DistributedFileSystem 实例初始化
fs.initialize(uri, conf);
return fs;
}
在调用 DistributedFileSystem.initialize(URI uri, Configuration conf)方法之前, 先来看一下 DistributedFileSystem 类吧.
DistributedFileSystem 是抽象类 FileSystem 的子类实现,
public class DistributedFileSystem extends FileSystem {
...
DFSClient dfs; // DistributedFileSystem 持有一个 DFSClient 类型的成员变量 dfs, 最重要的成员变量!
...
}
调用 DistributedFileSystem.initialize(URI uri, Configuration conf)方法
public void initialize(URI uri, Configuration conf) throws IOException {
...
// new 一个 DFSClient 实例, 成员变量 dfs 引用这个 DFSClient 实例
this.dfs = new DFSClient(uri, conf, statistics );
/* 分割线 2, 期待着 new DFSClient()的返回 */
...
}
在 new DFSClient 实例之前, 先来看一下 DFSClient 类吧! 看一下到底要为哪些成员变量赋值
public class DFSClient implements java.io.Closeable, RemotePeerFactory {
...
final ClientProtocol namenode; //DFSClient 持有一个 ClientProtocol 类型的成员变量 namenode, 一个 RPC 代理对象
/* The service used for delegation tokens */
private Text dtService;
...
}
来自分割线 2, 调用 DFSClient 的构造函数 DFSClient(URI nameNodeUri, Configuration conf, FileSystem$Statistics statistics), 再调用重载构造函数 DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, FileSystem$Statistics statistics)
public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf,
FileSystem.Statistics stats) throws IOException {
...
NameNodeProxies.ProxyAndInfo ClientProtocol proxyInfo = null;
if (numResponseToDrop 0) { // numResponseToDrop = 0
// This case is used for testing.
LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
+ is set to + numResponseToDrop
+ , this hacked client will proactively drop responses
proxyInfo = NameNodeProxies.createProxyWithLossyRetryHandler(conf,
nameNodeUri, ClientProtocol.class, numResponseToDrop);
}
if (proxyInfo != null) { // proxyInfo = null
this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = proxyInfo.getProxy();
} else if (rpcNamenode != null) { // rpcNamenode = null
// This case is used for testing.
Preconditions.checkArgument(nameNodeUri == null);
this.namenode = rpcNamenode;
dtService = null;
} else { // 前面两个 if 只在测试的情况下成立, 这个 else 的代码块才是重点
...
/*
* 创建一个 NameNodeProxies.ProxyAndInfo ClientProtocol 类型的对象, proxyInfo 引用这个对象
* createProxy(conf, nameNodeUri, ClientProtocol.class)方法是不是和 RPC.getProxy(Class T protocol,
* long clientVersion, InetSocketAddress addr, Configuration conf)很像?
* 没错! 你没看错! 这说明 createProxy()方法内部一定会调用 RPC 的相关方法
* conf 都是 Configuration 类型的 conf
* nameNodeUri = hdfs://node1:9000 这不就是 InetSocketAddress 类型的 addr 的 hostName 和 port
* ClientProtocol.class 都是 RPC protocol 接口的类对象
* ClientProtocol is used by user code via DistributedFileSystem class to communicate
* with the NameNode
* ClientProtocol 是 DistributedFileSystem 用来与 NameNode 通信的
*/
proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class);
/* 分割线 3, 期待着 createProxy()方法的返回 */
this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = proxyInfo.getProxy();
}
...
}
来自分割线 3, 调用 NameNodeProxies.createProxy(Configuration conf, URI nameNodeUri, Class T xface)方法
/**
* Creates the namenode proxy with the passed protocol. This will handle
* creation of either HA- or non-HA-enabled proxy objects, depending upon
* if the provided URI is a configured logical URI.
* 通过传过来的 protocol 参数, 创建 namenode 的代理对象. 至于是 HA 还是非 HA 的 namenode 代理对象,
* 这取决于实际搭建的 Hadoop 环境
**/
public static T ProxyAndInfo T createProxy(Configuration conf, URI nameNodeUri, Class T xface)
throws IOException {
// 获取 Hadoop 实际环境中 HA 的配置
Class FailoverProxyProvider T failoverProxyProviderClass =
getFailoverProxyProviderClass(conf, nameNodeUri, xface);
if (failoverProxyProviderClass == null) { // 非 HA, 这里是 Hadoop 的伪分布式搭建
// Non-HA case, 创建一个非 HA 的 namenode 代理对象
return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,
UserGroupInformation.getCurrentUser(), true);
} else { // HA
// HA case
FailoverProxyProvider T failoverProxyProvider = NameNodeProxies
.createFailoverProxyProvider(conf, failoverProxyProviderClass, xface,
nameNodeUri);
Conf config = new Conf(conf);
T proxy = (T) RetryProxy.create(xface, failoverProxyProvider,
RetryPolicies.failoverOnNetworkException(
RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts,
config.maxRetryAttempts, config.failoverSleepBaseMillis,
config.failoverSleepMaxMillis));
Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri);
// 返回一个 proxy, dtService 的封装对象 proxyInfo
return new ProxyAndInfo T (proxy, dtService);
}
}
调用 NameNodeProxies.createNonHAProxy(Configuration conf, InetSocketAddress nnAddr, Class T xface, UserGroupInformation ugi, boolean withRetries)方法
public static T ProxyAndInfo T createNonHAProxy(Configuration conf, InetSocketAddress nnAddr,
Class T xface, UserGroupInformation ugi, boolean withRetries) throws IOException { Text dtService = SecurityUtil.buildTokenService(nnAddr); //dtService = 192.168.8.101:9000
T proxy;
if (xface == ClientProtocol.class) { // xface = ClientProtocol.class
// 创建一个 namenode 代理对象
proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi, withRetries);
/* 分割线 4, 期待着 createNNProxyWithClientProtocol()方法返回 */
} else if {
...
}
// 把 proxy, dtService 封装成一个 ProxyAndInfo 对象, 并返回
return new ProxyAndInfo T (proxy, dtService);
}
以上是“HDFS 中 FileSystem 是什么类”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注丸趣 TV 行业资讯频道!