博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
通过spark sql 将 hdfs上文件导入到mongodb
阅读量:6908 次
发布时间:2019-06-27

本文共 6206 字,大约阅读时间需要 20 分钟。

功能:通过spark sql 将hdfs 中文件导入到mongdo

 所需jar包有:mongo-spark-connector_2.11-2.1.2.jar、mongo-java-driver-3.8.0.jar

 scala代码如下: 

import org.apache.spark.sql.Row import org.apache.spark.sql.Dataset import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.SparkSession import com.mongodb.spark._ import org.bson.Document import com.mongodb.spark.config._ object Exec {
def main(args: Array[String]) {
if (args.length < 6) {
System.err.println("Usage: Exec
") System.exit(1) } val hdfsServer = args(0) // "hdfs://master" val logPath = args(1) // "/user/hdfs/log/" val fileName = args(2) // 2017-05-04.txt val mongoHost = args(3) // "10.15.22.22:23000" val mongoDB = args(4) // "mongo db" val mongoCollection = args(5) //"mongo collection" try { import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .master("local") .appName("SparkImportDataToMongo") .config("spark.debug.maxToStringFields", 500).getOrCreate() import spark.implicits._ val df = spark.read.json(hdfsServer + logPath + "/" + fileName) df.printSchema() df.write.mode("append").format("com.mongodb.spark.sql.DefaultSource").option("spark.mongodb.output.uri", "mongodb://" + mongoHost + "/" + mongoDB + "." + mongoCollection).save() } catch { case ex: Exception => { printf(ex.toString()) } } } }

在spark 运行目录执行如下命令:

./bin/spark-submit  --master spark://11.12.13.14:7077 --class Exec //bigdata/spark-2.1.1-bin-hadoop2.6/examples/ImportDataToMongo.jar hdfs://master /user/hdfs/log/ 2017-05-04.txt 10.15.22.22:27017 mydb data_default_test

运行:

[root@master spark-2.1.1-bin-hadoop2.6]#   ./bin/spark-submit  --master spark://11.12.13.14:7077 --class Exec //bigdata/spark-2.1.1-bin-hadoop2.6/examples/ImportDataToMongo.jar hdfs://master /user/hdfs/log/ 2017-05-04.txt 10.15.22.22:27017 mydb data_default_test
18/07/20 23:41:13 INFO spark.SparkContext: Running Spark version 2.1.1
18/07/20 23:41:14 INFO spark.SecurityManager: Changing view acls to: root
18/07/20 23:41:14 INFO spark.SecurityManager: Changing modify acls to: root
18/07/20 23:41:14 INFO spark.SecurityManager: Changing view acls groups to: 
18/07/20 23:41:14 INFO spark.SecurityManager: Changing modify acls groups to: 
18/07/20 23:41:14 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
18/07/20 23:41:14 INFO util.Utils: Successfully started service 'sparkDriver' on port 24073.
18/07/20 23:41:14 INFO spark.SparkEnv: Registering MapOutputTracker
18/07/20 23:41:14 INFO spark.SparkEnv: Registering BlockManagerMaster
18/07/20 23:41:14 INFO storage.BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
18/07/20 23:41:14 INFO storage.BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
18/07/20 23:41:14 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-9c42a710-559b-4c97-b92a-58208a77afeb
18/07/20 23:41:14 INFO memory.MemoryStore: MemoryStore started with capacity 366.3 MB
18/07/20 23:41:14 INFO spark.SparkEnv: Registering OutputCommitCoordinator
18/07/20 23:41:14 INFO util.log: Logging initialized @1777ms
18/07/20 23:41:14 INFO server.Server: jetty-9.2.z-SNAPSHOT
18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@c65a5ef{/jobs,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6b5176f2{/jobs/json,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@b672aa8{/jobs/job,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2fab4aff{/jobs/job/json,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@ec0c838{/stages,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6e46d9f4{/stages/json,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5cc69cfe{/stages/stage,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@29cfd92b{/stages/stage/json,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@21c64522{/stages/pool,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7997b197{/stages/pool/json,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@11dee337{/storage,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@460f76a6{/storage/json,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@55f3c410{/storage/rdd,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@11acdc30{/storage/rdd/json,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@770d4269{/environment,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4a8ab068{/environment/json,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1922e6d{/executors,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@76a82f33{/executors/json,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6bab2585{/executors/threadDump,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@74bdc168{/executors/threadDump/json,null,AVAILABLE,@Spark}
18/07/20 23:41:14 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@644c78d4{/static,null,AVAILABLE,@Spark}

 

 

 

转载地址:http://bxgdl.baihongyu.com/

你可能感兴趣的文章
彻底搞清楚浏览器渲染过程
查看>>
Linux中用户管理
查看>>
搜索关键词分析——以个人博客网站为例
查看>>
原型链
查看>>
Laravel 编码实践分享
查看>>
浅谈React Hooks
查看>>
《前端十年心路-我把一切告诉你》的书稿大纲&问题收集
查看>>
薪资1.5万,学习IT让我重新找到方向
查看>>
恭喜 containerd 毕业
查看>>
使用SVG+React实现飞行仪表仿真
查看>>
Spring aop+自定义注解统一记录用户行为日志
查看>>
mysql explain
查看>>
php7 源码安装
查看>>
MyBatis-Plus初步
查看>>
CSS利用@font-face使用自定义字符和图标
查看>>
推荐10个Java方向最热门的开源项目(8月)
查看>>
重磅发布 | 黑镜调查:深渊背后的真相之「DDoS 威胁与黑灰产业调查报告」
查看>>
windows本地安装部署 Easy Mock
查看>>
Java编程基础07——面向对象_类&private&this
查看>>
Redis勒索事件爆发,如何避免从删库到跑路?
查看>>