根据(thirsd的个人空间 - 哔哩哔哩 ( ゜- ゜)つロ 乾杯~ Bilibili)整理
一、图建模
根据建模的目标确定需要解答的问题列表,设计一个模型来帮你解答这些问题,如果无法解答,继续优化模型。
图模型,可以参考行业或其他已经建立的模型:
neo4j.com/graphgists
neo4j.com/sandbox
二、cypher示例
查询示例:
match (node:Entity {key:value} )
where node.otherkey < 5
match (node)-[:REL]->(other:Thing)
with node, collect(other.name) as others
where size(others) > 10
return *
更新示例:
load csv from "url" as row
create (from:Entity {id:row.source})
set from.key = toInteger(row.fromKey)
Merge (to:Thing {id:row.target})
on create set to.value = toFloat(row.value)
Merge (from)-[:REL]->(to)
Remove to.tmp
set from:Processed
执行函数
call procedure.name("param", 10)
yield key as id, value as user
where id > 10 and function.name(user, "abc") = 5
return id, user
创建索引和约束
# 为label的property创建索引
create index on :labe(property);
# 为label的property的创建唯一性约束
create constraint on (n:label) assert n.property is Unique;
# 为label创建property存在约束
create constraint on (n:label) assert exists(n.property);
# 为关系创建property存在约束
create constraint on (:label) -[r:REL]->(:label) assert exists(r.property);
三、高效查询和导入
对于在MATCH
和MERGE
中使用属性值得情况,建议通过索引和约束来提高查询效率。
3.1 neo4j如何使用索引
仅当查询起始节点时才会使用索引
创建neo4j的schema:
create constraint on (u:User) assert u.id is unique;
create constraint on (b:Business) assert b.id is unique;
create constraint on (r:Review) assert r.id is unique;
create constraint on (c:Category) assert c.id is unique;
create index on :Bussiness(location);
create index on :Bussiness(city);
create index on :Review(date);
3.2 导入数据
将json转为csv,可以使用jq(一款用于处理json的轻量级的命令行)
导入时,
1、拷贝csv文件至import的目录下
2、csv是字符型,需要转换为数据目标格式
数字类型:toInteger(row.num), toFloat(row.money),
布尔型:case row.action when "buy" then true else false end
日期时间:date(row.date), datetime(row.time)
位置:point({x:toFloat(row.at), y:toFloat(row.lon)})
split(row,categories, ",")
方式一:原始导入
load csv with headers from "file://review.csv" as row
merge (b:business{id: row.business_id})
merge (u:user {id: row.user_id})
merge (r:review {id: row.review_id})
on create set r.stars = toInteger(row.stats), r.text = row.text
merge (r)-[:REVIEWS]->(b)
merge (u)-[rr:WROTE]->(r)
on create set rr.date = row.date
通过explain cypher_sql 可以查看cypher_sql 的执行计划,对于load命令也支持
方式二:多值传参/逐个导入
load csv with headers from "file:////user.csv" as row
merge (u:user{id:row.user_id})
on create set u.name = row.name,u.reviews =toInteger(row.review_count), u.date = date(row.yelping_since);
load csv with headers from "file:///business.csv" as row
merge (b:Business {id: row.business_id})
on create set b.stars = toFloat(row.stars), b.reviews = toInteger(row.review_count), b.location = point(row{.latitude, .longitude}), b += row {.name, .address, .city, .state};
load csv with headers from "file:///review.csv" as row
match (u:User {id: row.user_id})
match (b:Business {id: row.business_id})
create (r:Review {id: row.review_id})
set r.stars = toInteger(row.starts), r.date = date(row.date), r.text = row.text
create (u)-[:WROTE]->(r)
create (r)-[:REVIEWS]->(b);
执行多行cypher脚本的方法:
1、在neo4j的浏览器中,打开muti-line的编辑器开关
2、cat import/import_xxx.cypher | bin/cypher-shell
方式三:Periodic commit
USING PERIODic COMMIT 20000
load csv with headers from "file:////user.csv" as row
merge (u:user{id:row.user_id})
on create set u.name = row.name,u.reviews =toInteger(row.review_count), u.date = date(row.yelping_since);
方式四:apoc.load.json和apoc.periodic,iterate
检查json文件:
call apoc.load.json("file:///user.json")
yield value
return value limit 10;
call apoc.load.json("file:///user.json")
yield value
return count(*);
加载json文件
call apoc.load.json("user.json")
yield value
with value limit 10000
merge (u:User {id:value.user_id})
on create set
u.name = value.name,
u.fans = value.fans
call apoc.load.json("file:///user.json")
yield value
with value limit 100
match (u:User {id: value.user_id})
unwind values.frineds as friend
merge (f:User {id:friend})
merge (u)-[:FRIEND]-(f)
call apoc.load.json("file:///business.json")
yield value
with value litmit 1000
merge (b:Business {id: value.business_id})
on create set b += value{.name, .review_count, .stars, .address, .state},
b.location = point(value{.latitude,.longitude})
with b, value.categories as ategories
unwind categories as cat
merge (c:Categroy {name:cat})
merge (b)-[:IN_CATEGORY]->(c)
以上方案存在以下问题:
1、单线程运行
2、事务必须在RAM中
解决方案:apoc.periodic.iterate,将一个事务切分为小批次并行运行
call apoc.periodic.iterate(
"call apoc.load.json('file:///user.json') yield value return value skip 1000 limit 200000",
"create (u:User {user_id: value.user_id}) set u.name, u += value{. review_count, .fans}",
{batchSize:10000, iterateList:true, parallel:true, concurrency:3}
);
apoc.periodic.iterate 可以导入json,类似的,也可以导入csv。
call apoc.periodic.iterate(
"load csv with headers from 'file:////user.csv' as row return row",
"merge (u:user{id:row.user_id}) on create set u.name = row.name,u.reviews =toInteger(row.review_count), u.date = date(row.yelping_since)",
{batchSize:10000, iterateList:true, parallel:true, concurrency:3}
);
注意:return row
,返回数据用于切分
batchsize 建议在10k-50k,iterateList建议默认打开。
方式五:第三方工具
./bin/neo4j-admin import
命令导入
[图片上传失败...(image-fdfcf1-1606033640638)]
[图片上传失败...(image-cafd0d-1606033640638)]
调用示例:
[图片上传失败...(image-61a82c-1606033640638)]
注意:
1、需要在导入后创建索引和约束
2、性能提升至100s可以导入上亿记录
3、确保有足够的内存和磁盘
4、csv文件可以是压缩文件、正则表达匹配方式从文件总读取
5、指定数据类型的格式
stars:int,is_active:boolean, types:string[], date:Date
header location:Point(WGS-84)
方式六:RDMS 导入工具:NEO4J ETL
适用于初始化导入
操作步骤:
1、配置并选择RDMS的数据源
2、选择目标的neo4j数据库
3、加载关系元数据
4、查看并编辑映射关系
5、导出Tables到csv,保存至import目录下
6、通过导入工具导入数据
7、导入结构错误可以使用apo.refactor来重构
方式七:使用pyton的neo4j-driver包来导入
方式八:其他的导入工具
knowbi-pentaho-pdi-neo4j-output
Talend Neo4j integration
Streamsets
GraphAware Databridge