Java上传文件至HDFS(Kerbers认证)

Author Avatar
AF_ 02月 21,2020
  • 使用微信扫码分享

上传之前,需要本机安装Hadoop并且配置环境变量,可以参考之前的文章 MacOS 安装Hadoop并配置环境变量,然后修改hosts文件,映射HDFS NameNode IP-主机名。

引入依赖

<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-common</artifactId>
   <version>2.7.2</version>
</dependency>
<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-hdfs</artifactId>
   <version>2.7.2</version>
</dependency>
<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-client</artifactId>
   <version>2.7.2</version>
</dependency>

Kerbers认证

    /**
     * kerbers认证调用方式
     *
     * @param config
     */
    private static void kerberosConfig(Configuration config) {

        //kerberos principal
        String kerUser = "hdfs/****";

        //设置java安全krb5配置,其中krb5.conf文件可以从成功开启kerberos的集群任意一台节点/etc/krb5.conf拿到,放置本地
        System.setProperty("java.security.krb5.conf", "/home/lb/gio-script/krb5.conf");

        //获取服务器hdfs-site.xml配置文件放置本地或者Resource目录下
        config.addResource(new Path("/home/lb/gio-script/hdfs-site.xml"));

        config.set("fs.default.name", "hdfs://nameservice1/");

        //指定认证方式为Kerberos
        config.set("hadoop.security.authentication", "Kerberos");
        try {
            UserGroupInformation.setConfiguration(config);

            //对应kerberos principal的keytab文件,从服务器获取放置本地
            UserGroupInformation.loginUserFromKeytab(kerUser, "/home/lb/gio-script/hdfs.keytab");
        } catch (IOException e) {
            log.error("Kerbers认证异常 !", e);
        }
    }

获取HDFS链接

    /**
     * @return 得到hdfs的连接 FileSystem类
     * @throws
     * @throws IOException
     * @throws InterruptedException
     */
    private static FileSystem getFileSystem() {
        // 创建HDFS属性对象
        Configuration config = new Configuration();
        // 获取FileSystem类的方法有很多种,这里只写一种
        config.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
        config.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
        //调用认证方法
        kerberosConfig(config);
        try {
            return FileSystem.get(config);
        } catch (IOException e) {
            log.error("获取HDFS连接异常 !", e);
        }
        return null;
    }

上传文件

    /**
     * 上传文件
     *
     * @throws IOException
     */
    public void FileUpload(String localPath, String targetPaths) throws IOException {
        FileSystem fs = this.getFileSystem();
        //本地文件上传到 hdfs
        fs.copyFromLocalFile(new Path(localPath), new Path(targetPaths));
        fs.close();
    }

删除文件

    /**
     * 删除文件
     *
     * @throws Exception
     */
    public void deleteFile(String targetPath) throws Exception {
        FileSystem fs = this.getFileSystem();
        Path path = new Path(targetPath);
        boolean exists = fs.exists(path);
        if (exists) {
            boolean b = fs.delete(new Path(targetPath), true);
            log.debug("Whether to delete the file : " + b);
        } else {
            log.debug("Remove the HDFS file . path not exists .");
        }
        fs.close();
    }

查看文件目录

    /**
     * 查看文件目录
     *
     * @throws Exception
     */
    public void catFolder(String hPath) throws IOException {
        FileSystem fs = this.getFileSystem();
        Path path = new Path(hPath);
        FileStatus[] fileList = fs.listStatus(path);
        for (FileStatus f : fileList) {
            System.out.println(f.getPath().getName());
        }
    }

连接池的配置方式详见:使用commons-pool2实现hdfs连接池