这个很实用,JAVA操作HDFS工具类

一、HDFSUtil工具类补充删除文件 /**

* 删除某一路径

* @param path 需要删除的路径

* @param recursive 指定为true删除目录中全部文件,false时可以删除空目录和单个文件

* @return

*/

public boolean delete(String path, boolean recursive) {

boolean result = false;

if(recursive) {

try {

result = fs.delete(new Path(path), true);

} catch (Exception e) {

e.printStackTrace();

result = false;

}

return result;

}else{

try {

result = fs.delete(new Path(path), false);

} catch (Exception e) {

e.printStackTrace();

result = false;

}

return result;

}

}

获取某一路径下的文件信息/**

* 获得某一路径下的文件信息

* @param path 待查看路径

* @return 文件信息列表-包含文件类型,文件大小,所有者,所在组,文件名称

*/

public List<String> getFileInfo(String path){

List<String> infos = new ArrayList<>();

try {

// 通过FileSystem获得某一路径下的文件状态列表

FileStatus[] fileStatus = fs.listStatus(new Path(path));

for (FileStatus temp : fileStatus) {

String info = "";

// 判断文件类型

if (temp.isDirectory()) {

info += "目录\t" + "0" + "\t";

}else {

info += "文件\t" + sizeFormat(temp.getLen()) + "\t";

}

// 拼接文件信息

info += temp.getOwner() + "\t" + temp.getGroup() + "\t" + temp.getPath().getName();

infos.add(info);

}

} catch (Exception e) {

e.printStackTrace();

}

return infos;

}

文件大小单位换算/**

* 文件大小单位换算

* @param length 默认获得的文件大小单位为Byte-字节

* @return 使用1024进行换算

*/

private String sizeFormat(long length) {

long result = length;

// 不足1024则单位为Byte

if (result / 1024 == 0) {

return result + "B";

}else {

result /= 1024;

// 不足1024*1024则单位为KB,否则为MB

if (result / 1024 == 0) {

return result + "KB";

}else {

return result / 1024 + "MB";

}

}

}

将本地文件内容写入HDFS文件中

create():覆盖原文件

append():直接在原文件基础上进行追加/**

* 将本地磁盘文件内容写入HDFS文件中

* @param src 源文件路径

* @param parentDir 目标文件父级目录

* @param fileName 目标文件名称

* @param overwrite 是否覆盖写入

* @return

*/

public boolean write(String src,String parentDir,String fileName,boolean overwrite) {

// 判断源文件是否存在,如不存在则直接返回

if (!new File(src).exists()) {

System.out.println("源文件不存在");

return false;

}

FSDataOutputStream fsDataOutputStream = null;

boolean isDir = false;

try {

// 由于HDFS的特殊性,必须保证父级路径是一个目录,而不能只判断是否存在

isDir = fs.isDirectory(new Path(parentDir));

} catch (Exception e) {

e.printStackTrace();

}

if (!isDir) {// false -> 可能为文件也可能不存在

try {

// 尝试创建父级目录

fs.mkdirs(new Path(parentDir));

} catch (Exception e) {

// 出现异常说明该路径下已经存在了文件 - 与目标文件夹文件相同

e.printStackTrace();

System.out.println("该路径不可用");

return false;

}

}

Path destPath = new Path(parentDir + File.separator + fileName);

if (overwrite) {

try {

// 覆盖写入时使用create方法进行创建,指定覆盖参数为true

fsDataOutputStream = fs.create(destPath,true);

} catch (Exception e) {

e.printStackTrace();

}

}else {

try {

// 保证文件一定存在,如果已经存在返回false,不会重新创建

fs.createNewFile(destPath);

// 追加写入时使用append方法进行创建

fsDataOutputStream = fs.append(destPath);

} catch (Exception e) {

e.printStackTrace();

}

}

// 初始化输入流,指定编码

BufferedReader bufferedReader = null;

Writer writer = null;

try {

bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(new File(src)), "UTF-8"));

writer = new OutputStreamWriter(fsDataOutputStream, "UTF-8");

} catch (Exception e) {

e.printStackTrace();

}

BufferedWriter bufferedWriter = new BufferedWriter(writer);

String temp = "";

int line = 0;

try {

while((temp = bufferedReader.readLine()) != null) {

bufferedWriter.write(temp);

bufferedWriter.newLine();

line ++;

// 每一千行写入一次数据

if (line % 1000 == 0) {

bufferedWriter.flush();

}

}

} catch (IOException e) {

e.printStackTrace();

return false;

}

try {

bufferedWriter.flush();

bufferedWriter.close();

writer.close();

bufferedReader.close();

fsDataOutputStream.close();

} catch (IOException e) {

e.printStackTrace();

return false;

}

return true;

}

读取HDFS中的文件内容/**

* 从指定文件中读取数据

* @param path HDFS路径

*/

public void read(String path) {

try {

// 使用open方法获得一个输入流

FSDataInputStream fsDataInputStream = fs.open(new Path(path));

// 使用缓冲流读取文件内容

BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(fsDataInputStream, "UTF-8"));

String temp = "";

while ((temp = bufferedReader.readLine()) != null) {

System.out.println(temp);

}

bufferedReader.close();

fsDataInputStream.close();

} catch (Exception e) {

e.printStackTrace();

}

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

二、PropertiesUtil工具类

此工具类作用是获取properties中的配置import java.io.IOException;

import java.util.Properties;

public class PropertiesUtil {

private String fileName;

private Properties properties = new Properties();

public PropertiesUtil(String fileName) {

this.fileName = fileName;

open();

}

private void open() {

try {

properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(fileName));

} catch (IOException e) {

e.printStackTrace();

}

}

public String readPropertyByKey(String key) {

return properties.getProperty(key);

}

}

三、RemoteUtil工具类

此工具类的作用是直接通过传入hdfs语句对hdfs进行操作

注意事项:

通过SSH的工具类实现命令执行时需要将命令的全路径写出

远程登录需要导入ganymed-ssh2.jar包import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStream;

import java.io.InputStreamReader;

import org.apache.commons.lang.StringUtils;

import ch.ethz.ssh2.Connection;

import ch.ethz.ssh2.Session;

import ch.ethz.ssh2.StreamGobbler;

public class RemoteUtil {

private static String DEFAULTCHART = "UTF-8";

private Connection conn;

private String host;

private String userName;

private String userPwd;

public RemoteUtil(String host, String userName, String userPwd) {

this.host = host;

this.userName = userName;

this.userPwd = userPwd;

}

// 登录

public Boolean login() {

boolean flg = false;

try {

conn = new Connection(host);

conn.connect();// 连接

flg = conn.authenticateWithPassword(userName, userPwd);// 认证

} catch (IOException e) {

e.printStackTrace();

}

return flg;

}

// 执行操作

public String execute(String cmd) {

String result = "";

try {

if (login()) {

System.out.println("登录成功");

Session session = conn.openSession();// 打开一个会话

session.execCommand(cmd);// 执行命令

// session.getStdout():获得session标准输出

result = processStdout(session.getStdout(), DEFAULTCHART);

// 如果未得到标准输出为空,说明脚本执行出错了

if (StringUtils.isBlank(result)) {

result = processStdout(session.getStderr(), DEFAULTCHART);

}

conn.close();

session.close();

}

} catch (IOException e) {

e.printStackTrace();

}

return result;

}

// 将从服务器得到的数据包装成String返回

private String processStdout(InputStream in, String charset) {

// 接收目标服务器上的控制台返回结果

InputStream stdout = new StreamGobbler(in);

StringBuffer buffer = new StringBuffer();

try {

// 将控制台的返回结果包装成BufferedReader

BufferedReader br = new BufferedReader(new InputStreamReader(stdout, charset));

String line = null;

while ((line = br.readLine()) != null) {

buffer.append(line + "\n");

}

br.close();

} catch (Exception e) {

e.printStackTrace();

}

return buffer.toString();

}

public static void setCharset(String charset) {

DEFAULTCHART = charset;

}

}

四、对工具类的测试配置文件system.propertieshostName=SZ01

hdfsPort=8020

hadoopUser=bigdata

hadoopPwd=bigdata

hadoopBinHome=/home/bigdata/hadoop-2.7.2/bin

userDataDir=/input/user

1

2

3

4

5

6

测试类UtilTest.javaimport java.util.List;

import com.sand.util.HDFSUtil;

import com.sand.util.PropertiesUtil;

import com.sand.util.RemoteUtil;

public class UtilTest {

public static void main(String[] args) {

// TODO 工具类测试类

PropertiesUtil propertiesUtil = new PropertiesUtil("system.properties");

String host = propertiesUtil.readPropertyByKey("hostName");

String userName = propertiesUtil.readPropertyByKey("hadoopUser");

String userPwd = propertiesUtil.readPropertyByKey("hadoopPwd");

// 使用Java API的方式获取文件信息

HDFSUtil hdfsUtil = new HDFSUtil(host);

// 输出根目录下的内容信息

List<String> list = hdfsUtil.getFileInfo("/");

for (String string : list) {

System.out.println(string);

}

// 远程登录至Hadoop集群环境,通过命令执行取回信息

RemoteUtil remoteUtil = new RemoteUtil(host, userName, userPwd);

String bin = propertiesUtil.readPropertyByKey("hadoopBinHome");

// 执行时可以使用命令全路径

String result = remoteUtil.execute(bin + "/hdfs dfs -ls /");

System.out.println(result);

// 如果相关命令所在路径已经在PATH中声明,则可以先source再执行

// 通过&&可以组合执行多条命令

result = remoteUtil.execute("source .bash_profile && hdfs dfs -ls /");

System.out.println(result);

// System.out.println(result.split("\n")[0]);

}

}

运行结果:

五、通过Web操作HDFS上传本地文件至HDFS

upload.jsp<%@ page language="java" contentType="text/html; charset=UTF-8"

pageEncoding="UTF-8"%>

<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">

<html>

<head>

<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">

<title>Insert title here</title>

</head>

<body>

<form action="UploadServlet" method="post" enctype="multipart/form-data">

<input type="file" name="data" />

<input type="submit" value="上传" />

</form>

</body>

</html>

UploadServlet.javaimport java.io.File;

import java.io.IOException;

import java.io.PrintWriter;

import java.util.List;

import java.util.UUID;

import javax.servlet.ServletException;

import javax.servlet.annotation.MultipartConfig;

import javax.servlet.annotation.WebServlet;

import javax.servlet.http.HttpServlet;

import javax.servlet.http.HttpServletRequest;

import javax.servlet.http.HttpServletResponse;

import javax.servlet.http.Part;

import com.sand.util.HDFSUtil;

import com.sand.util.PropertiesUtil;

/**

* Servlet implementation class UploadServlet

*/

@WebServlet("/UploadServlet")

@MultipartConfig

public class UploadServlet extends HttpServlet {

private static final long serialVersionUID = 1L;

/**

* @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse

* response)

*/

protected void doGet(HttpServletRequest request, HttpServletResponse response)

throws ServletException, IOException {

// 设置编码

response.setCharacterEncoding("UTF-8");

response.setContentType("text/plain; charset=UTF-8");

// 使用Part对象接收文件

Part part = request.getPart("data");

// 取出文件名(如果需要)

String path = "E://userData/";

// 可以使用自定义的名称,也可以使用UUID

String fileName = UUID.randomUUID().toString();

// 从登陆信息中获取当前用户的唯一标识

String userId = "1";

// 使用write方法向路径中写入文件

part.write(path + File.separator + fileName);

// 从配置文件中读取所需参数

PropertiesUtil propertiesUtil = new PropertiesUtil("system.properties");

String hostName = propertiesUtil.readPropertyByKey("hostName");

String userDataDir = propertiesUtil.readPropertyByKey("userDataDir");

HDFSUtil hdfsUtil = new HDFSUtil(hostName);

// 使用工具类进行上传

hdfsUtil.upLoad(true, true, new String[]{path + fileName}, userDataDir + userId + "/" + fileName);

PrintWriter printWriter = response.getWriter();

// 使用工具类获得文件信息

List<String> list = hdfsUtil.getFileInfo(userDataDir + userId);

// 将内容输出至页面

for (String info : list) {

printWriter.write(info + "\n");

}

printWriter.flush();

printWriter.close();

}

/**

* @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse

* response)

*/

protected void doPost(HttpServletRequest request, HttpServletResponse response)

throws ServletException, IOException {

// TODO Auto-generated method stub

doGet(request, response);

}

}

展示hdfs文件系统中的文件

从根目录开始展示,当目标是目录时,可以继续点击展示其中的内容

CheckMsgServlet.javaimport java.io.IOException;

import java.util.List;

import javax.servlet.ServletException;

import javax.servlet.annotation.WebServlet;

import javax.servlet.http.HttpServlet;

import javax.servlet.http.HttpServletRequest;

import javax.servlet.http.HttpServletResponse;

import com.sand.util.HDFSUtil;

import com.sand.util.PropertiesUtil;

/**

* Servlet implementation class CheckMsgServlet

*/

@WebServlet("/CheckMsgServlet")

public class CheckMsgServlet extends HttpServlet {

private static final long serialVersionUID = 1L;

public CheckMsgServlet() {

super();

}

protected void doGet(HttpServletRequest request, HttpServletResponse response)

throws ServletException, IOException {

String method = request.getParameter("method");

System.out.println(method);

response.setCharacterEncoding("UTF-8");

response.setContentType("text/plain; charset=UTF-8");

PropertiesUtil propertiesUtil = new PropertiesUtil("system.properties");

String host = propertiesUtil.readPropertyByKey("hostName");

String userName = propertiesUtil.readPropertyByKey("hadoopUser");

String userPwd = propertiesUtil.readPropertyByKey("hadoopPwd");

// 使用Java API的方式获取文件信息

HDFSUtil hdfsUtil = new HDFSUtil(host);

//PrintWriter printWriter = response.getWriter();

List<String> list = null;

if (method == null) {

list = hdfsUtil.getFileInfo("/");

request.setAttribute("infoList", list);

request.getRequestDispatcher("show.jsp").forward(request, response);

/*

* for (String info : list) { printWriter.write(info + "\n"); }

*/

}else if("dir".equals(method)){

String path = request.getParameter("path");

System.out.println(path);

list = hdfsUtil.getFileInfo(path);

request.setAttribute("infoList", list);

request.getRequestDispatcher("show.jsp").forward(request, response);

}

}

protected void doPost(HttpServletRequest request, HttpServletResponse response)

throws ServletException, IOException {

// TODO Auto-generated method stub

doGet(request, response);

}

}

show.jsp

用到jstl标签库,需要导入jstl的jar包<%@ page language="java" contentType="text/html; charset=UTF-8"

pageEncoding="UTF-8"%>

<%@taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core"%>

<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">

<html>

<head>

<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">

<title>Insert title here</title>

</head>

<body>

<c:forEach items="${infoList}" var="info">

${info}<br />

</c:forEach>

</body>

</html>

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

此实现方式较为简单,由于未包装返回项目信息的实体类,所以直接在HDFSUtil中通过判断封装了a标签进行判断跳转

HDFSUtil.java/**

* 获得某一路径下的文件信息

* @param path 待查看路径

* @return 文件信息列表-包含文件类型,文件大小,所有者,所在组,文件名称

*/

public List<String> getFileInfo(String path){

List<String> infos = new ArrayList<>();

try {

// 通过FileSystem获得某一路径下的文件状态列表

FileStatus[] fileStatus = fs.listStatus(new Path(path));

for (FileStatus temp : fileStatus) {

String info = "";

// 判断文件类型

if (temp.isDirectory()) {

info += "目录\t" + "0" + "\t";

info += temp.getOwner() + "\t" + temp.getGroup() + "\t" + "<a href='CheckMsgServlet?method=dir&path=" + temp.getPath() + "'>" + temp.getPath().getName() + "</a>";

}else {

info += "文件\t" + sizeFormat(temp.getLen()) + "\t";

info += temp.getOwner() + "\t" + temp.getGroup() + "\t" + temp.getPath().getName();

}

// 拼接文件信息

//info += temp.getOwner() + "\t" + temp.getGroup() + "\t" + temp.getPath().getName();

infos.add(info);

}

} catch (Exception e) {

e.printStackTrace();

}

return infos;

}

发表评论
留言与评论(共有 0 条评论)
   
验证码:

相关文章

推荐文章

'); })();