match (m)-[x]-(n) where m.age>30 return m.name, m.age
如上查询语句产生如下查询计划:
-
Pipe
: Most pipes are decorators - they wrap another pipe.ParamPipe
andNullPipe
the only exception to this -
PipeWithSource
: 将sourcePipe生成的results作为input,调用internalCreateResults(sourceResult, decoratedState)
abstract class PipeWithSource(source: Pipe) extends Pipe {
override def createResults(state: QueryState): Iterator[ExecutionContext] = {
val sourceResult = source.createResults(state)
val decoratedState = state.decorator.decorate(this, state)
decoratedState.setExecutionContextFactory(executionContextFactory)
val result = internalCreateResults(sourceResult, decoratedState)
state.decorator.decorate(this, result)
}
protected def internalCreateResults(state: QueryState): Iterator[ExecutionContext] =
throw new UnsupportedOperationException("This method should never be called on PipeWithSource")
protected def internalCreateResults(input:Iterator[ExecutionContext], state: QueryState): Iterator[ExecutionContext]
private[pipes] def testCreateResults(input:Iterator[ExecutionContext], state: QueryState): Iterator[ExecutionContext] =
internalCreateResults(input, state)
}
-
ProduceResultsPipe
: 将input每一行转换成 columns: Seq[String]构成的Map
case class ProduceResultsPipe(source: Pipe, columns: Seq[String]) (val id: Id = Id.INVALID_ID) extends PipeWithSource(source)
-
ProjectionPipe
:将input按照projection方式进行投影,主要调用projection.project(ctx, state)
case class ProjectionPipe(source: Pipe, projection: CommandProjection)
(val id: Id = Id.INVALID_ID) extends PipeWithSource(source) {
projection.registerOwningPipe(this)
protected def internalCreateResults(input: Iterator[ExecutionContext], state: QueryState): Iterator[ExecutionContext] = {
if (projection.isEmpty)
input
else {
input.map {
ctx =>
projection.project(ctx, state)
ctx
}
}
}
}
-
AllNodesScanPipe
: 扫描所有的nodes,主要调用state.query.nodeOps.all.map
case class AllNodesScanPipe(ident: String)(val id: Id = Id.INVALID_ID) extends Pipe {
protected def internalCreateResults(state: QueryState): Iterator[ExecutionContext] = {
val baseContext = state.newExecutionContext(executionContextFactory)
state.query.nodeOps.all.map(n => executionContextFactory.copyWith(baseContext, ident, n))
}
}
-
ExpandAllPipe
: 匹配满足fromName-relName->toName的所有边
case class ExpandAllPipe(source: Pipe,
fromName: String,
relName: String,
toName: String,
dir: SemanticDirection,
types: LazyTypes)
(val id: Id = Id.INVALID_ID) extends PipeWithSource(source) {
protected def internalCreateResults(input: Iterator[ExecutionContext], state: QueryState): Iterator[ExecutionContext] = {
input.flatMap {
row =>
getFromNode(row) match {
case n: NodeValue =>
val relationships: Iterator[RelationshipValue] = state.query.getRelationshipsForIds(n.id(), dir, types.types(state.query))
relationships.map { r =>
val other = r.otherNode(n)
executionContextFactory.copyWith(row, relName, r, toName, other)
}
case Values.NO_VALUE => None
case value => throw new InternalException(s"Expected to find a node at '$fromName' but found $value instead")
}
}
}
}
-
FilterPipe
: 过滤结果集,核心在于调用input.filter
case class FilterPipe(source: Pipe, predicate: Expression)
(val id: Id = Id.INVALID_ID) extends PipeWithSource(source) {
predicate.registerOwningPipe(this)
protected def internalCreateResults(input: Iterator[ExecutionContext], state: QueryState): Iterator[ExecutionContext] =
input.filter(ctx => predicate(ctx, state) eq Values.TRUE)
}