首页  ·  知识 ·  大数据
javaapi操作hdfs
mangocool  mangocool  实践应用  编辑:越泽   图片来源:网络
开始学习hadoop时写过一次,一晃就两年了,记忆也模糊起来。现在再次需要时,居然还花了我两小时搜集和整理资料才使代码正常跑起来,如果是两年前就无所谓了,可现在不同,我觉得这种时间完全没

依赖:jdk1.7,hadoop-2.7.2

开发环境:ideaIU-14.1.4

测试环境:win7

建立maven工程Upload2HiveThrift,在pom.xml配置文件添加必要的依赖:


<?xml version="1.0" encoding="UTF-8"?>
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.xbdp.upload2hive</groupId>
    <artifactId>upload2hive</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.thrift</groupId>
            <artifactId>libthrift</artifactId>
            <version>0.9.2</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.12</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpcore</artifactId>
            <version>4.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.2</version>
        </dependency>
        <dependency>
            <groupId>net.sf.json-lib</groupId>
            <artifactId>json-lib</artifactId>
            <version>2.2.2</version>
        </dependency>
    </dependencies>
</project>






Oper2Hdfs.java类:

package com.xbdp.hdfs;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
/**
 * Created by MANGOCOOL on 2016/8/18.
 */
public class Oper2Hdfs {
    static Configuration conf = new Configuration();
    static FileSystem fs;
    static String path = "/home/hadoop/SW/hadoop/etc/hadoop/";
    static String hdfsUrl = "hdfs://h8:9000/";
    static
    {
        // 如果这些配置放在项目的resources目录下,就不需要加路径,会默认读取
        conf.addResource(new Path(path + "core-site.xml"));
        conf.addResource(new Path(path + "hdfs-site.xml"));
        conf.addResource(new Path(path + "mapred-site.xml"));
        // 设置fs.defaultFS参数,如果没有设置,会出现java.lang.IllegalArgumentException:
        // Wrong FS:hdfs://master:9000/xxx,expected: file:///
        // 也可将hadoop集群中的core-site.xml配置文件拷贝到项目下,这样在读取配置文件时就能够识别hdfs文件系统
        // 读取配置方式,可以不加,即便是集群中配置了standby节点也没关系,会自动识别
        conf.set("fs.defaultFS", hdfsUrl);
        //设置fs.hdfs.impl和fs.file.impl,否则可能出现java.io.IOException: No FileSystem for scheme: hdfs
        //也可以在core-default.xml
        //<property>
        //<name>fs.hdfs.impl</name>
        //<value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
        //<description>The FileSystem for hdfs: uris.</description>
        //</property>
        conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
        conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
        try {
            //fs = FileSystem.get(new URI(hdfsUrl), conf, "hadoop");// 获取hdfs实例
            fs = FileSystem.get(conf);// 读取配置方式,可以用这个简单方法
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * 上传文件到HDFS
     * @param localPath
     * @param file
     * @throws IOException
     */
    private static void upload2Hdfs(String localPath, String file) throws IOException
    {
        String dst = hdfsUrl + file;
        InputStream in = new BufferedInputStream(new FileInputStream(localPath));
        OutputStream out = fs.create(new Path(dst), new Progressable() {
            public void progress() {
            }
        });
        IOUtils.copyBytes(in, out, 4096, true);
    }
    /**
     * 从HDFS上读取文件
     * @param hdfsPath
     * @param localPath
     * @throws IOException
     */
    private static void readFromHdfs(String hdfsPath, String localPath) throws IOException
    {
        FSDataInputStream hdfsInStream = fs.open(new Path(hdfsPath));
        OutputStream out = new FileOutputStream(localPath);
        byte[] ioBuffer = new byte[1024];
        int readLen = hdfsInStream.read(ioBuffer);
        while(-1 != readLen){
            out.write(ioBuffer, 0, readLen);
            readLen = hdfsInStream.read(ioBuffer);
        }
        out.close();
        hdfsInStream.close();
    }
    /**
     * 删除HDFS上的文件
     * @param hdfsPath
     * @return
     * @throws IOException
     */
    private static boolean deleteFromHdfs(String hdfsPath) throws IOException
    {
        boolean flag = true;
        Path path = new Path(hdfsPath);
        if(fs.exists(path))
        {
            fs.deleteOnExit(path);
        } else
        {
            flag = false;
            System.out.println("路径不存在!");
        }
        return flag;
    }
    /**
     * 创建HDFS目录
     * @param hdfsDir
     * @throws IOException
     */
    public static void createDir(String hdfsDir) throws IOException
    {
        Path path = new Path(hdfsDir);
        fs.mkdirs(path);
        System.out.println("new dir \t" + conf.get("fs.default.name") + " | " + hdfsDir);
    }
    /**
     * 遍历HDFS上的文件和目录
     * @param hdfsDir
     * @throws IOException
     */
    private static void getDirFromHdfs(String hdfsDir) throws IOException
    {
        FileStatus fileList[] = fs.listStatus(new Path(hdfsDir));
        int size = fileList.length;
        for(int i = 0; i < size; i++){
            System.out.println("name:" + fileList[i].getPath().getName() + "\tsize:" + fileList[i].getLen());
        }
    }
    /**
     * main函数
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        System.setProperty("hadoop.home.dir", "E:\\Program Files\\hadoop-2.7.0");
        try {
            createDir("/test");
            String localPath = "E:\\Program Files\\XX-Net-2.9.2/LICENSE.txt";
            String file = "test/LICENSE.txt";
            upload2Hdfs(localPath, file);
            String hdfsPath = hdfsUrl + "test/LICENSE.txt";
            localPath = "/home/LICENSE.txt";
            readFromHdfs(hdfsPath, localPath);
            String hdfsDir = hdfsUrl + "/test";
            getDirFromHdfs(hdfsDir);
            hdfsPath = hdfsUrl + "test/";
            deleteFromHdfs(hdfsPath);
        } catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            if(fs != null)
                fs.close();
        }
    }
}





遇到问题:

1、java.io.IOException: No FileSystem for scheme: hdfs

?

1
2
3
4
5
6
7
8
9
java.io.IOException: No FileSystem for scheme: hdfs
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2421)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2428)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367)
        at FileCopyToHdfs.readFromHdfs(FileCopyToHdfs.java:65)
        at FileCopyToHdfs.main(FileCopyToHdfs.java:26)

加入以下代码即可:

?

1
2
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());

2、java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSumsByteArray(II[BI[BIILjava/lang/String;JZ)V

?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSumsByteArray(II[BI[BIILjava/lang/String;JZ)V
    at org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSumsByteArray(Native Method)
    at org.apache.hadoop.util.NativeCrc32.calculateChunkedSumsByteArray(NativeCrc32.java:86)
    at org.apache.hadoop.util.DataChecksum.calculateChunkedSums(DataChecksum.java:430)
    at org.apache.hadoop.fs.FSOutputSummer.writeChecksumChunks(FSOutputSummer.java:202)
    at org.apache.hadoop.fs.FSOutputSummer.flushBuffer(FSOutputSummer.java:163)
    at org.apache.hadoop.fs.FSOutputSummer.flushBuffer(FSOutputSummer.java:144)
    at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2254)
    at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2236)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
    at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
    at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:61)
    at com.xbdp.hdfs.Oper2hdfs.uploadToHdfs(Oper2hdfs.java:68)
    at com.xbdp.hdfs.Oper2hdfs.main(Oper2hdfs.java:143)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

这是由于hadoop.dll 版本问题,2.4之前的和之后的需要的不一样,下载后加入你win下的hadoop/bin目录。

下载地址:https://github.com/steveloughran/winutils

别忘了加入代码:

?

1
System.setProperty("hadoop.home.dir", "E:\\Program Files\\hadoop-2.7.0");

最好把下载的winutils.exe也加入hadoop/bin中。

3、java.io.FileNotFoundException: \home (拒绝访问。)

?

1
2
3
4
5
6
7
8
9
10
11
java.io.FileNotFoundException: \home (拒绝访问。)
    at java.io.FileOutputStream.open(Native Method)
    at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
    at java.io.FileOutputStream.<init>(FileOutputStream.java:110)
    at com.xbdp.hdfs.Oper2hdfs.readFromHdfs(Oper2hdfs.java:79)
    at com.xbdp.hdfs.Oper2hdfs.main(Oper2hdfs.java:149)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

这个简单,意思就是你要操作的本来是文件,但是你这里只指定了文件的目录,当然拒绝你啊!把文件名补上即可。



本文作者:mangocool 来源:mangocool
CIO之家 www.ciozj.com 微信公众号:imciow
   
免责声明:本站转载此文章旨在分享信息,不代表对其内容的完全认同。文章来源已尽可能注明,若涉及版权问题,请及时与我们联系,我们将积极配合处理。同时,我们无法对文章内容的真实性、准确性及完整性进行完全保证,对于因文章内容而产生的任何后果,本账号不承担法律责任。转载仅出于传播目的,读者应自行对内容进行核实与判断。请谨慎参考文章信息,一切责任由读者自行承担。
延伸阅读