Skip to content

Commit

Permalink
增加take分组函数
Browse files Browse the repository at this point in the history
  • Loading branch information
zhou-hao committed Aug 24, 2020
1 parent 98bec9b commit de247a4
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ static <T> void createCalculator(BiFunction<String, BiFunction<Number, Number, O

// group by interval('1s')
addGlobal(new GroupByIntervalFeature());
addGlobal(new GroupByTakeFeature());
//按分组支持
Arrays.asList(
"property",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package org.jetlinks.reactor.ql.supports.group;

import lombok.extern.slf4j.Slf4j;
import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.Function;
import net.sf.jsqlparser.expression.LongValue;
import net.sf.jsqlparser.expression.StringValue;
import org.jetlinks.reactor.ql.ReactorQLMetadata;
import org.jetlinks.reactor.ql.ReactorQLRecord;
import org.jetlinks.reactor.ql.feature.FeatureId;
import org.jetlinks.reactor.ql.feature.GroupFeature;
import org.jetlinks.reactor.ql.utils.ExpressionUtils;
import reactor.core.publisher.Flux;

import java.time.Duration;
import java.util.List;

import static org.jetlinks.reactor.ql.utils.CastUtils.parseDuration;

/**
* 分组取指定数量数据
* <pre>
* group by take(10,-2) => flux.take(10).takeLast(2)
*
* group by take(-1)=> flux.takeLast(1)
* </pre>
*
* @author zhouhao
* @since 1.0
*/
@Slf4j
public class GroupByTakeFeature implements GroupFeature {

public final static String ID = FeatureId.GroupBy.of("take").getId();

@Override
public String getId() {
return ID;
}

@Override
public java.util.function.Function<Flux<ReactorQLRecord>, Flux<? extends Flux<ReactorQLRecord>>> createGroupMapper(Expression expression, ReactorQLMetadata metadata) {

Function function = ((Function) expression);
List<Expression> expressions;
if (function.getParameters() == null || (expressions = function.getParameters().getExpressions()).isEmpty()) {
throw new UnsupportedOperationException("take函数参数错误");
}
int first = ExpressionUtils.getSimpleValue(expressions.get(0)).map(Number.class::cast).map(Number::intValue).orElse(1);
boolean hasSecond = expressions.size() > 1;

int second = hasSecond
? ExpressionUtils
.getSimpleValue(expressions.get(1)).map(Number.class::cast).map(Number::intValue)
.orElse(1)
: 1;

if (first >= 0) { // take(n)
if (hasSecond) {
if (second >= 0) { //take(n,n2)
return flux -> flux.take(first).take(second).as(Flux::just);
} else { //take(n,-n2)
return flux -> flux.take(first).takeLast(-second).as(Flux::just);
}
}
return flux -> flux.take(first).as(Flux::just);
} else { // take(-n)
if (hasSecond) {
if (second >= 0) { // take(-n,n2)
return flux -> flux.takeLast(first).take(second).as(Flux::just);
} else { // take(-n,-n2)
return flux -> flux.takeLast(first).takeLast(-second).as(Flux::just);
}
}
return flux -> flux.takeLast(-first).as(Flux::just);
}
}


}
36 changes: 33 additions & 3 deletions src/test/java/org/jetlinks/reactor/ql/ReactorQLTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1129,7 +1129,7 @@ void testGroupTake2() {
}

@Test
void testGroupTakeLast() {
void testTakeLast() {
ReactorQL.builder()
.sql("select take(this,-1) v from test group by _window(5)")
.build()
Expand All @@ -1139,10 +1139,20 @@ void testGroupTakeLast() {
.as(StepVerifier::create)
.expectNext(4, 9, 10)
.verifyComplete();

ReactorQL.builder()
.sql("select this v from test group by _window(5), take(-1)")
.build()
.start(Flux.range(0, 11))
.doOnNext(System.out::println)
.map(map -> map.get("v"))
.as(StepVerifier::create)
.expectNext(4, 9, 10)
.verifyComplete();
}

@Test
void testGroupTake3Last2() {
void testTake3Last2() {
ReactorQL.builder()
.sql("select take(this,3,-2) v from test group by _window(5)")
.build()
Expand All @@ -1152,10 +1162,20 @@ void testGroupTake3Last2() {
.as(StepVerifier::create)
.expectNext(1, 2, 6, 7, 10)
.verifyComplete();

ReactorQL.builder()
.sql("select take(this,3,-2) v from test group by _window(5),take(3,-2)")
.build()
.start(Flux.range(0, 11))
.doOnNext(System.out::println)
.map(map -> map.get("v"))
.as(StepVerifier::create)
.expectNext(1, 2, 6, 7, 10)
.verifyComplete();
}

@Test
void testGroupLast3Tak2() {
void testLast3Tak2() {
ReactorQL.builder()
.sql("select take(this,-3,2) v from test group by _window(5)")
.build()
Expand All @@ -1165,6 +1185,16 @@ void testGroupLast3Tak2() {
.as(StepVerifier::create)
.expectNext(2, 3, 7, 8,10)
.verifyComplete();

ReactorQL.builder()
.sql("select take(this,3,-2) v from test group by _window(5),take(3,-2)")
.build()
.start(Flux.range(0, 11))
.doOnNext(System.out::println)
.map(map -> map.get("v"))
.as(StepVerifier::create)
.expectNext(1, 2, 6, 7, 10)
.verifyComplete();
}


Expand Down

0 comments on commit de247a4

Please sign in to comment.