Hands on Neo4j

Hands on Neo4j

This is the script of a hands on session on MongoDB. The example refers to a typical retail example [ER diagram] with orders [CSV], products [CSV] and users [CSV].

getting a Neo4j running on your machine using docker

docker pull neo4j
docker run -p 7474:7474 -p 7687:7687 -d neo4j

then open http://localhost:7474 and enter the password (neo4j)

let’s see an example where neo4j rocks

Relational data describing a taxonomi

The categories

 

categrories
ID Name
0 top
1 sub1
2 sub2
11 sub1.1
12 sub1.2
121 sub1.2.1
the relationships among the categories
relationships
src dest
0 1
0 2
1 11
1 12
12 121

let’s now create this taxonomy in neo4j

syntax
  • nodes CREATE (<variabile>:<tipo> {<field>: <valore>}) and
  • edges CREATE (<variabile>)-[:<proriterà>]->(<variabile>)
example
CREATE (top:Category {name: "top"})
CREATE (sub1:Category {name: "sub1"})
CREATE (sub2:Category {name: "sub2"})
CREATE (sub11:Category {name: "sub1.1"})
CREATE (sub12:Category {name: "sub1.2"})
CREATE (sub121:Category {name: "sub1.2.1"})
CREATE (top)-[:narrower]->(sub1),
(top)-[:narrower]->(sub2),
(sub1)-[:narrower]->(sub11),
(sub1)-[:narrower]->(sub12),
(sub12)-[:narrower]->(sub121)

let’s query it in cypher

Syntax
MATCH (<variabile>?:<tipo>? {<field>: <valore>}*)
-[<variabile>?:<tipo>? {<field>: <valore>}*]->?
(<variabile>?:<tipo>? {<field>: <valore>}*)?
WHERE <condition>?
RETURN <variabile>+

examples

walking one edge

All the nodes that can be reached from a node named top following one edge of type narrower

SQL
SELECT *
FROM relationships AS R JOIN categories AS C ON R.src = C.ID
WHERE C.name = 'top'
cypher
MATCH p=(a)-[:narrower]->() WHERE a.name="top" RETURN p

NOTE: not such a big difference …

walking down the entire taxonomi

all the nodes that can be reached from a node named top following any number ofedges of type narrower

SQL
SELECT R.dest, R1.dest, R2.dest
FROM relationships AS R JOIN categories AS C ON R.src = C.ID JOIN
     relationships AS R1 ON R1.src = R.dest JOIN
     relationships AS R2 ON R2.src = R1.dest JOIN
     relationships AS R3 ON R3.src = R2.dest 
WHERE C.name = 'top'

NOTE: what if the taxonomy was deeper? What if you don’t know how deep the taxonomy is?

cypher
MATCH p=(a)-[:narrower*]->() WHERE a.name="top" RETURN p

NOTE: Wow! This is easy!!! Thank you cypher!

controlling the number of walks

all the nodes that can be reached from a node named top following a minimum of 3 and a maximum of 3 edges of type narrower

MATCH p=(a)-[:narrower*2..3]->() WHERE a.name="top" RETURN p

let’s add some relational data

syntax

importing users

LOAD CSV FROM
'http://emanueledellavalle.org/data/neo4j/retail/users.csv' AS line
CREATE (:User {uid: toInteger(line[0]), name: line[1]})

importing products

LOAD CSV FROM
'http://emanueledellavalle.org/data/neo4j/retail/products.csv' AS line
CREATE (:Product {uid: line[0], brand: line[2], name: line[1]})

importing and merging orders

LOAD CSV FROM
'http://emanueledellavalle.org/data/neo4j/retail/orders.csv' AS line
MATCH (user:User {uid: toInteger(line[1])})
MATCH (product:Product {uid: line[2]})
MERGE (order:Order {id:  toInteger(line[0])})
ON CREATE SET order.date = line[4]
MERGE (order)-[:contains {qty: toInteger(3)}]->(product)
MERGE (order)-[:orderedBy]->(user)

NOTE: the MERGE clause ensures that a pattern exists in the graph. Either the pattern already exists, or it needs to be created.

let’s add a category to a product and see neo4j rocking again!

MATCH (category:Category {name: "sub1.2.1"})
MATCH (product:Product {uid: "a"})
MERGE (product)-[:topic]->(category)
MATCH (order)-[:contains]->(product),
      (product)-[:topic]->(category),
      (supercategory)-[:narrower*]->(category)
WHERE supercategory.name="sub1" 
RETURN order, product, category

NOTE: WOW! This is SQL is, in general, impossible to write!

links

  • https://neo4j.com/docs/cypher-refcard/current/

Hands on MongoDB

Hands on MongoDB

This is the script of a hands on session on MongoDB. The example refers to a typical retail example with products [CSV], users [CSV], and orders [CSV].

getting a mongoDB running on your machine using docker

docker pull mongo
docker run --name some-mongo -d mongo
docker run -it --link some-mongo:mongo --rm mongo sh -c 'exec mongo "$MONGO_PORT_27017_TCP_ADDR:$MONGO_PORT_27017_TCP_PORT/test"'

the basics

adding two users to obtain their ObjectId

db.user.insertMany([
    { "name" : "Alice"  },
    { "name" : "Bob"    }
])

looking up one of the user via ObjectId

db.user.find( {"_id": ObjectId("5bf6ce4dcdb0763e7b6b9138")} )

Note: the ObjectId on your machine will be different

adding products

db.products.insertMany([
    { name : "red apple", brand : "XYZ"},
    { name : "blue berry", brand : "XYZ"},
    { name : "cake", brand : "Acme"}
])

adding orders linked to users and products

Note 1: ObjectIds on your machine will be different Note 2: qty is on purpose written either as an int or as string. You’ll see why below in the part related to finding documents

db.orders.insert({
  "utente" : {
    id: ObjectId("5bf6ce4dcdb0763e7b6b9138"),
    name: "Alice"
  },
  products : [{ 
      id: ObjectId("5bf6cfffcdb0763e7b6b913a"),
      name : "red apple",
      qty : 2
    },{
      id: ObjectId("5bf6cfffcdb0763e7b6b913b"),
      name : "blue berry",
      qty : 3
    }],
  date : 1536656557
})
db.orders.insert({
  "utente" : {
    id: ObjectId("5bf6ce4dcdb0763e7b6b9139"),
    name: "Bob"
  },
  products : [{ 
      id: ObjectId("5bf6cfffcdb0763e7b6b913a"),
      name : "red apple",
      qty : 4
    },{
      id: ObjectId("5bf6cfffcdb0763e7b6b913c"),
      name : "cake",
      qty : 2
    }],
  date : 1536656558
})
db.orders.insert({
  "utente" : {
    id: ObjectId("5bf6ce4dcdb0763e7b6b9139"),
    name: "Bob"
  },
  products : [{ 
      id: ObjectId("5bf6cfffcdb0763e7b6b913a"),
      name : "red apple",
      qty : "2"
    },{
      id: ObjectId("5bf6cfffcdb0763e7b6b913b"),
      name : "blue berry",
      qty : "2"
    }],
  date : "1536656560"
})
db.orders.insert({
  "utente" : {
    id: ObjectId("5bf6ce4dcdb0763e7b6b9138"),
    name: "Alice"
  },
  products : [{ 
      id: ObjectId("5bf6cfffcdb0763e7b6b913b"),
      name : "blue berry",
      qty : "2",
      comment : "please, the freshest ones!"
    },{
      id: ObjectId("5bf6cfffcdb0763e7b6b913c"),
      name : "cake",
      qty : 2
    }],
  date : 1536656559
})

finding orders

get all orders

SQL: select * from orders

MongoDB: db.orders.find({})

one simple conditions on date

SQL: select * from orders where date = 1536656559

R: which(order$date==1536656559)

MongoDB: db.order.find({ date : 1536656559 })

another “simple” condition on the name of the buyer

SQL:

select * 
from orders join users on orders.user = users.id 
where users.name="Alice"

R: which((left_join(orders,users,by=c("user"="id")))$name=="Alice")

MongoDB:

db.orders.find({
    "utente.name": "Alice"
})

NOTE: MongoDB rocks! just some dot notation … NOTE: however there’s a bad side: all joins that are not paths in the document must be handled writing some code (see forEach)

a simple range query

SQL: select * from orders where date >1536656558

R: which(order$date>A1536656559)

MongoDB:

db.orders.find({
  date : {$gt : 1536656559}
})

some simple SQL queries becomes hard (because of arrays)

SQL:

select * 
from orders  
where qty = 2

R: which(order$qty=2)

MongoDB:

db.orders.find({
  products :  { $elemMatch: {qty: 2} }
})

NOTE: not dot notation here :-/ (see also elemMatch)

multiple conjunctive conditions (a.k.a. ANDs)

SQL:

select * 
from orders join users on orders.user = users.id 
where users.name="Alice" and oders.qty > 1

R: ...

MongoDB:

db.orders.find({ 
  "utente.name": "Alice",
  products :  { $elemMatch: {qty: {$gt: 1} } }
})

Note: it does not match the qty expressed as strings

multiple disjunctive conditions (a.k.a. ORs)

SQL:

select * 
from orders join users on orders.user = users.id 
where users.name="Alice" or oders.qty > 1

R: ...

MongoDB:

db.orders.find({ 
   $or : [
    {"utente.name": "Alice"},
    {products :  { $elemMatch: {qty: {$gt: 1} } }}
  ]
})

projections (a.k.a. SELECTs)

QL:

select date 
from orders join users on orders.user = users.id 
where users.name="Alice" 

R: (which((left_join(orders,users,by=c("user"="id")))$name=="Alice"))$date

MongoDB:

db.orders.find(
    {"utente.name": "Alice"},
    {date: 1, "_id":0, "products.qty": 1}
)

Aggregates: counting the orders for qty > 2 grouping by user

QL:

select user, count(*) 
from orders join users on orders.user = users.id 
where orders.qty > 2
group by users.name

R: ...

MongoDB:

db.orders.aggregate( [ 
    {$match: {
        products : { $elemMatch: {qty: {$gt: 2} } }
    }}, 
    {$group: { _id: "$utente.name", count: {$sum: 1} }} 
])

updates

Assigning all Bobs orders to Carl (simplified ignoring ObjectId)

db.orders.updateMany(
    { "utente.name": "Bob" },
    {
        $set: { "utente.name": "Carl" },
        $currentDate: { lastModified: true }
    }
)

indexes (advance topic)

introduction

Note: the explain() function returns the execution plan of a query

notice the difference between the following two simple queries:

db.orders.find( { "_id" : ObjectId("5bfd06bbf645af7d9cce94b6")} ).explain()

MongoDB uses a FETCH IDHACK to look up the object in the hash table it uses internally. This is an operation MongoDB can perform in constant time independently from the size of the collection.

db.orders.find( { "utente.name": "Carl" } ).explain()

MongoDB uses a COLSAN, i.e. it scans all the element in the collection to find those that match. This is an operation MongoDB cannot perform in constant time: the larger is the collection to scan, the longer it takes.

Adding a index

Let’s add an index on the name of the user

db.orders.createIndex( { "utente.name": 1 } )

let’s explain the previous query again

db.orders.find( { "utente.name": "Carl" } ).explain()

Now MongoDB uses an IXSCAN, i.e., it uses a binary tree to index documents with the same value. This is an operation MongoDB cannot perform in constant time, but when the collection grows the time to find documents does not grow linearly in the size of the collection. Instead, it grows sub-linearly in the number of distinct values present in the field.

For more information read Query Optimization > Explain Results

Remove index

db.orders.dropIndex( { "utente.name": 1 } )

indexing and horizontal scalability

Indexes are in memory and do not work well in presence of shards. They slow down query execution. If you want to read more https://stackoverflow.com/questions/9084569/mongodb-index-in-memory-with-sharding

links to learn more

Hands on Hortonworks Data Platform

Note: this tutorial was tested for Hortonworks Data Platform (HDP®) 2.6.4 and 2.6.5 on Hortonworks Sandbox on a Mac using Docker and on a c5.2xlarge (8 cores, 16GB) in AWS using Docker.

Get Hortonworks Data Platform on Hortonworks Sandbox

  • get the VM
  • follow the installation guide
    • on a Mac
      • make sure you give the VM enough RAM (at least 8GB)
      • if you have 8 core on your machine consider giving 4 cores to the VM (you will be able to see parallelization in action)
    • on AWS
  • set up the environment
    • in particular understand on which IP your HDP is and map it to your desired hostname in the Hosts File
    • reset the admin password using the Web shell
    • reset the ambari admin password using the ambari-admin-password-reset script

Hand-on HDFS (ingestion)

Hands-on Hive (ingestion & wrangling)

  • go to hive 2.0 view
  • create new table with the wizard
    • referring to hfds file /user/maria_dev/data/geolocation.csv
    • have a look to the jobs! This is a full traditional ingestion+wrangling procedure!!!
  • learn some details
    • describe geolocation
    • show create table geolocation
    • describe formatted geolocation
  • for more information

Hands-on Zeppelin (exploration – part I)

  • open Zeppelin UI
  • create a new jdbc notebook
  • explore the content of geolocation table
%jdbc(hive)
SELECT city, event, count(event) as cnt FROM geolocation WHERE event != "normal" GROUP BY city, event
  • configure the bar chart as follows
    • keys: city
    • groups: event
    • values: cnt

hands-on Tez (admin role)

  • open the Tez view
  • have a look to the visual explanation of the plan
  • see the difference with the underlying tez plan
    • show the timeline in query details
    • open the dag e show the GHraph View and the Vertex Swimlane

hands-on sqoop (ingestion & wrangling)

      • configure sqoop
        • got to Web shell
        • login as root
        • get the jdbc driver for postgresql
          curl -L 'http://jdbc.postgresql.org/download/postgresql-9.2-1002.jdbc4.jar' -o postgresql-9.2-1002.jdbc4.jar
        • copy the driver to scoop libs
          cp postgresql-9.2-1002.jdbc4.jar /usr/hdp/current/sqoop-client/lib/
      • test scoop
        • check the connection
          sqoop list-tables --connect jdbc:postgresql://95.110.236.35:5432/sqooptest --username sqoop --password t3stsq00p
      • ingest the table in hive
    sqoop import --connect jdbc:postgresql://95.110.236.35:5432/sqooptest --username sqoop --password t3stsq00p --table trucks --hive-import --create-hive-table -m 1 --direct
    • NOTE: it gives a permission error, but it works

Hands-on Zeppelin (exploration – part II)

  • go back on zeppelin for some more exploration
%jdbc(hive) 
SELECT model, sum(jun13_miles)/sum(jun12_miles) as milesIndex, sum(jun13_gas)/sum(jun12_gas) as gasIndex  FROM trucks GROUP BY model
  • configure the bar chart as follows
    • keys: models
    • values: milesIndex, gasIndex

hands-on Oozie and Ambari Workflow Manager (data view)

  • go to Ambari Workflow Manager
  • build the Oozie workflow
    • add a fork
    • on a branch add two Hive2 actions in a row
      • note: the jdbc connection URL is
        jdbc:hive2://sandbox-hdp.hortonworks.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2
      • TruckMiliage:
        CREATE TABLE TruckMiliage STORED AS ORC AS SELECT truckid, driverid, rdate, miles, gas, miles / gas mpg FROM trucks LATERAL VIEW stack(54, 'jun13',jun13_miles,jun13_gas,'may13',may13_miles,may13_gas,'apr13',apr13_miles,apr13_gas,'mar13',mar13_miles,mar13_gas,'feb13',feb13_miles,feb13_gas,'jan13',jan13_miles,jan13_gas,'dec12',dec12_miles,dec12_gas,'nov12',nov12_miles,nov12_gas,'oct12',oct12_miles,oct12_gas,'sep12',sep12_miles,sep12_gas,'aug12',aug12_miles,aug12_gas,'jul12',jul12_miles,jul12_gas,'jun12',jun12_miles,jun12_gas,'may12',may12_miles,may12_gas,'apr12',apr12_miles,apr12_gas,'mar12',mar12_miles,mar12_gas,'feb12',feb12_miles,feb12_gas,'jan12',jan12_miles,jan12_gas,'dec11',dec11_miles,dec11_gas,'nov11',nov11_miles,nov11_gas,'oct11',oct11_miles,oct11_gas,'sep11',sep11_miles,sep11_gas,'aug11',aug11_miles,aug11_gas,'jul11',jul11_miles,jul11_gas,'jun11',jun11_miles,jun11_gas,'may11',may11_miles,may11_gas,'apr11',apr11_miles,apr11_gas,'mar11',mar11_miles,mar11_gas,'feb11',feb11_miles,feb11_gas,'jan11',jan11_miles,jan11_gas,'dec10',dec10_miles,dec10_gas,'nov10',nov10_miles,nov10_gas,'oct10',oct10_miles,oct10_gas,'sep10',sep10_miles,sep10_gas,'aug10',aug10_miles,aug10_gas,'jul10',jul10_miles,jul10_gas,'jun10',jun10_miles,jun10_gas,'may10',may10_miles,may10_gas,'apr10',apr10_miles,apr10_gas,'mar10',mar10_miles,mar10_gas,'feb10',feb10_miles,feb10_gas,'jan10',jan10_miles,jan10_gas,'dec09',dec09_miles,dec09_gas,'nov09',nov09_miles,nov09_gas,'oct09',oct09_miles,oct09_gas,'sep09',sep09_miles,sep09_gas,'aug09',aug09_miles,aug09_gas,'jul09',jul09_miles,jul09_gas,'jun09',jun09_miles,jun09_gas,'may09',may09_miles,may09_gas,'apr09',apr09_miles,apr09_gas,'mar09',mar09_miles,mar09_gas,'feb09',feb09_miles,feb09_gas,'jan09',jan09_miles,jan09_gas ) dummyalias AS rdate, miles, gas;
      • DriverMiliage:
        CREATE TABLE DriverMileage STORED AS ORC AS SELECT driverid, sum(miles) totmiles FROM TruckMiliage GROUP BY driverid;
    • on another branch add another Hive2 action
      • GeolocationAgg:
        CREATE TABLE GeolocationAgg STORED AS ORC AS SELECT driverid, count(driverid) occurance from geolocation where event!='normal' group by driverid
    • after the join action add two more Hive2 actions in a row
      • joined:
        CREATE TABLE joined STORED AS ORC AS SELECT a.driverid, a.occurance, b.totmiles FROM GeolocationAgg a, DriverMileage b WHERE a.driverid=b.driverid
      • riskfactor:
        CREATE TABLE riskfactor STORED AS ORC AS select driverid, occurance, totmiles, totmiles/occurance riskfactor from joined
    • validate the workflow saving it in /user/maria_dev/data/wf/riskanalysis.xml
    • submit and run it on submission (overwrite)
    • follow the progress on
  • Solution

Hands-on Zeppelin (visualize the results of the analysis)

  • go back on zeppelin
%jdbc(hive)
SELECT a.driverid, a.riskfactor, b.city
FROM riskfactor a, geolocation b 
WHERE  a.driverid=b.driverid AND b.state = "California"
ORDER BY a.riskfactor DESC
  • configure the scatter chart as follows
    • xAxis: city
    • yAxis: driverID
    • size: riskFactor
  • Solution

where to learn more