Hadoop API:遍历文件分区目录,并根据目录下的数据进行并行提交spark任务
2021-04-29 17:26
标签:hive 执行 glib 遍历文件 hdf .text hdfs catch mitm hadoop api提供了一些遍历文件的api,通过该api可以实现遍历文件目录: 并行执行sh的线程: 执行sh的java代码: submitsparkjob.sh 执行BatchSubmit.jar的命令: Hadoop API:遍历文件分区目录,并根据目录下的数据进行并行提交spark任务 标签:hive 执行 glib 遍历文件 hdf .text hdfs catch mitm 原文地址:http://www.cnblogs.com/yy3b2007com/p/7816917.htmlimport java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class BatchSubmitMain {
public static void main(String[] args) throws Exception {
String mrTableName = args[0];
String fglibTableName = args[1];
Configuration conf = new Configuration();
/*
*
import java.util.concurrent.CountDownLatch;
public class ImportThread extends Thread {
private final JavaShellInvoker javaShellInvoker = new JavaShellInvoker();
private CountDownLatch countDownLatch;
private String objectId;
private String submitShPath;
public ImportThread(String objectId, String submitShPath, CountDownLatch countDownLatch) {
this.objectId = objectId;
this.submitShPath = submitShPath;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "start... " + this.submitShPath + " " + this.objectId.toString());// 打印开始标记
try {
int result = this.javaShellInvoker.executeShell("mrraster", this.submitShPath, this.objectId);
if (result != 0) {
System.out.println(Thread.currentThread().getName() + " result type is error");
}
} catch (Exception e) {
e.printStackTrace();
System.out.println(Thread.currentThread().getName() + "-error:" + e.getMessage());
}
this.countDownLatch.countDown();// 计时器减1
System.out.println(Thread.currentThread().getName() + " complete,last " + this.countDownLatch.getCount() + " threads");// 打印结束标记
}
}
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.Date;
public class JavaShellInvoker {
private static final String executeShellLogFile = "./executeShell_%s_%s.log";
public int executeShell(String shellCommandType, String shellCommand, String args) throws Exception {
int success = 0;
args = (args == null) ? "" : args;
String now = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
File logFile = new File(String.format(executeShellLogFile, shellCommandType, now));
ProcessBuilder pb = new ProcessBuilder("sh", shellCommand, args);
pb.redirectOutput(ProcessBuilder.Redirect.appendTo(logFile));
pb.redirectError(ProcessBuilder.Redirect.appendTo(logFile));
Process pid = null;
try {
pid = pb.start();
success = pid.waitFor();
} catch (Exception ex) {
success = 2;
System.out.println("executeShell-error:" + ex.getMessage());
throw ex;
} finally {
if (pid.isAlive()) {
success = pid.exitValue();
pid.destroy();
}
}
return success;
}
}
#!/bin/sh
source ../login.sh
spark-submit --master yarn-cluster --class MySparkJobMainClass --driver-class-path /app/myaccount/service/jars/ojdbc7.jar --jars /app/myaccount/service/jars/ojdbc7.jar --num-executors
20 --driver-memory 6g --executor-cores 1 --executor-memory 8g MySparkJobJar.jar $1
hadoop jar BatchSubmit.jar
文章标题:Hadoop API:遍历文件分区目录,并根据目录下的数据进行并行提交spark任务
文章链接:http://soscw.com/essay/80158.html