第1页
MongoDB and Hadoop
Siyuan Zhou
Software Engineer, MongoDB
第2页
Agenda
• Complementary Approaches to Data • MongoDB & Hadoop Use Cases • MongoDB Connector Overview and Features • Examples
第3页
Complementary Approaches to Data
第4页
Operational: MongoDB
Real-Time Analytics
Product/Asset Catalogs
Churn Analysis
Recommende r
Security & Fraud
Internet of Things
Warehouse & ETL
Risk Modeling
Mobile Apps
Customer Data Mgmt
Trade Surveillance
Predictive Analytics
Single View
Social
Ad Targeting
Sentiment Analysis
第5页
MongoDB
• Fast storage and retrieval
• Easy administration
• Built-in analytical tools
– Aggregation framework – JavaScript MapReduce – Geo/text indexes
第6页
Analytical: Hadoop
Real-Time Analytics
Product/Asset Catalogs
Churn Analysis Recommender
Security & Fraud
Internet of Things
Warehouse & ETL
Risk Modeling
Mobile Apps
Customer Data Mgmt
Trade Surveillance
Predictive Analytics
Single View
Social
Ad Targeting
Sentiment Analysis
第7页
Hadoop
The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models.
• Terabyte and Petabyte datasets • Data warehousing • Advanced analytics • Ecosystem
第8页
Operational vs. Analytical: Lifecycle
Real-Time Analytics
Product/Asset Catalogs
Churn Analysis Recommender
Security & Fraud
Internet of Things
Warehouse & ETL
Risk Modeling
Mobile Apps
Customer Data Mgmt
Trade Surveillance
Predictive Analytics
Single View
Social
Ad Targeting
Sentiment Analysis
第9页
Management & Monitoring
Enterprise IT Stack
Application CRM, ERP, Collabsoration, Mobile, BI
Data Management
Operationa l RDBMS
RDBMS
Analytic al
EDW
Security & Auditing
Infrastructure
OS & Virtualization, Compute, Storage, Network
第10页
MongoDB & Hadoop Use Cases
第11页
Commerce
Applicatio ns
powered by
Analysis powered
by
Hadoop Connector
• Products & Inventory • Recommended products • Customer profile • Session management
• Elastic pricing • Recommendation models • Predictive analytics • Clickstream history
第12页
Insurance
Applicatio ns
powered by
Analysis powered
by
Hadoop Connector
• Customer profiles • Insurance policies • Session data • Call center data
• Customer action
analysis • Churn analysis • Churn prediction • Policy rates
第13页
Fraud Detection
Payments
Nightly Analysis
Online payments processing
query only
Fraud Detection
query only
MongoDB Connector for Hadoop
Fraud modeling
Results Cache
3rd Party Data Sources
第14页
MongoDB Connector for Hadoop
第15页
Connector Overview
Hadoop
Map Reduce, Hive, Pig, Spark
HDFS / S3 Text Files
Hadoop Connector
BSON Files
Hadoop Connector
MongoDB
Single Node, Replica Set, Cluster
Apache Hadoop / Cloudera CDH / Hortonworks HDP / Amazon EMR
第16页
Data Movement
Dynamic queries to MongoDB vs. BSON snapshots in HDFS
• Dynamic queries with latest data • Snapshots move load to Hadoop • Puts load on operational database • Add predictable load to MongoDB
第17页
Connector Features and Functionality
• Computes splits to read data
– Single Node, Replica Sets, Sharded Clusters
• Mappings for Pig and Hive
– MongoDB as a standard data source/destination
• Support for
– Filtering data with MongoDB queries – Authentication – Reading from Replica Set tags – Appending to existing collections
第18页
Data Split
• Standalone and replica set
– Split data into chunks
• Cluster
– Unsharded, same as standalone – Sharded, split per chunk – Sharded, split per shard
• BSON Files
– .split file stores metadata
第19页
MapReduce Configuration
• MongoDB input
– mongo.job.input.format = com.hadoop.MongoInputFormat – mongo.input.uri = mongodb://mydb:27017/db1.collection1
• MongoDB output
– mongo.job.output.format = com.hadoop.MongoOutputFormat – mongo.output.uri = mongodb://mydb:27017/db1.collection2
• BSON input/output
– mongo.job.input.format = com.hadoop.BSONFileInputFormat – mapred.input.dir = hdfs:///tmp/database.bson – mongo.job.output.format = com.hadoop.BSONFileOutputFormat – mapred.output.dir = hdfs:///tmp/output.bson
第20页
Pig Mappings
• Input: BSONLoader and MongoLoader
data = LOAD ‘mongodb://mydb:27017/db.collection’ using com.mongodb.hadoop.pig.MongoLoader
• Output: BSONStorage and MongoInsertStorage
STORE records INTO ‘hdfs:///output.bson’ using com.mongodb.hadoop.pig.BSONStorage
第21页
Pig Mappings
• Primitive type (Integer, String) => Primitive type in Pig • Document => Tuple defined by the schema • Document => Tuple of only one Map element (dynamic
schema mode) • Subdocument => Map • Array => Tuple or Bag
第22页
Hive Support
CREATE TABLE mongo_users (id int, name string, age int) STORED BY "com.mongodb.hadoop.hive.MongoStorageHandler" WITH SERDEPROPERTIES("mongo.columns.mapping” = "_id,name,age”) TBLPROPERTIES("mongo.uri" = "mongodb://host:27017/test.users”)
• Access collections as Hive tables • Use with MongoStorageHandler or BSONSerDe
第23页
Hive Support
• Primitive type (Integer, String) => Primitive type in Hive • Document => A row in Hive • Field (even if in sub-document) => a field of the row
(with schema) • Array => List • Subdocument => Struct / Map
第24页
Spark Usage
• Use with MapReduce input/output formats
• Create Configuration objects with input/output formats and data URI
• Load/save data using SparkContext Hadoop file API
Spark
HDFS Hadoop
Map Reduce
MongoDB
第25页
Examples
https://github.com/lovett89/mongodb-hadoop-workshop
第26页
Data Schema - Recommendation System
> db.ratings.findOne() {
"_id" : ObjectId("5388e41c12569b70c376e9fb"), "userid" : 1, "movieid" : 122, "ts" : ISODate("1996-08-02T11:24:06Z"), "rating" : 5 }
第27页
Read BSON
// create base BSON Configuration object Configuration bsonConfig = new Configuration(); bsonConfig.set("mongo.job.input.format”,
"com.mongodb.hadoop.BSONFileInputFormat");
JavaRDD<Object> movies = sc.newAPIHadoopFile(HDFS + "/movielens/movies.bson", BSONFileInputFormat.class, Object.class, BSONObject.class, bsonConfig).map( new Function<Tuple2<Object, BSONObject>, Object>() { @Override public Object call(Tuple2<Object, BSONObject> doc) throws Exception { return doc._2.get("movieid"); } } );
第28页
Write BSON
// create BSON output RDD from predictions JavaPairRDD<Object,BSONObject> predictions = predictedRatings.mapToPair(
new PairFunction<Rating, Object, BSONObject>() { @Override public Tuple2<Object, BSONObject> call(Rating rating) throws Exception { DBObject doc = BasicDBObjectBuilder.start() .add("userid", rating.user()) .add("movieid", rating.product()) .add("rating", rating.rating()) .add("timestamp", new Date()) .get(); // null key means an ObjectId will be generated on insert return new Tuple2<Object, BSONObject>(null, doc); }
} );
第29页
Write BSON (cont.)
// Create MongoDB output Configuration
Configuration outputConfig = new Configuration(); outputConfig.set("mongo.output.format”,
"com.mongodb.hadoop.MongoOutputFormat"); outputConfig.set("mongo.output.uri", MONGODB + "." +
OUTPUT);
predictions.saveAsNewAPIHadoopFile( "file:///not-applicable", Object.class, Object.class, MongoOutputFormat.class, outputConfig);
第30页
Questions?