diff --git a/src/main/java/org/jetlinks/reactor/ql/supports/DefaultReactorQLMetadata.java b/src/main/java/org/jetlinks/reactor/ql/supports/DefaultReactorQLMetadata.java index ea42f48..6b9b58f 100644 --- a/src/main/java/org/jetlinks/reactor/ql/supports/DefaultReactorQLMetadata.java +++ b/src/main/java/org/jetlinks/reactor/ql/supports/DefaultReactorQLMetadata.java @@ -118,6 +118,7 @@ static void createCalculator(BiFunction + * group by take(10,-2) => flux.take(10).takeLast(2) + * + * group by take(-1)=> flux.takeLast(1) + * + * + * @author zhouhao + * @since 1.0 + */ +@Slf4j +public class GroupByTakeFeature implements GroupFeature { + + public final static String ID = FeatureId.GroupBy.of("take").getId(); + + @Override + public String getId() { + return ID; + } + + @Override + public java.util.function.Function, Flux>> createGroupMapper(Expression expression, ReactorQLMetadata metadata) { + + Function function = ((Function) expression); + List expressions; + if (function.getParameters() == null || (expressions = function.getParameters().getExpressions()).isEmpty()) { + throw new UnsupportedOperationException("take函数参数错误"); + } + int first = ExpressionUtils.getSimpleValue(expressions.get(0)).map(Number.class::cast).map(Number::intValue).orElse(1); + boolean hasSecond = expressions.size() > 1; + + int second = hasSecond + ? ExpressionUtils + .getSimpleValue(expressions.get(1)).map(Number.class::cast).map(Number::intValue) + .orElse(1) + : 1; + + if (first >= 0) { // take(n) + if (hasSecond) { + if (second >= 0) { //take(n,n2) + return flux -> flux.take(first).take(second).as(Flux::just); + } else { //take(n,-n2) + return flux -> flux.take(first).takeLast(-second).as(Flux::just); + } + } + return flux -> flux.take(first).as(Flux::just); + } else { // take(-n) + if (hasSecond) { + if (second >= 0) { // take(-n,n2) + return flux -> flux.takeLast(first).take(second).as(Flux::just); + } else { // take(-n,-n2) + return flux -> flux.takeLast(first).takeLast(-second).as(Flux::just); + } + } + return flux -> flux.takeLast(-first).as(Flux::just); + } + } + + +} diff --git a/src/test/java/org/jetlinks/reactor/ql/ReactorQLTest.java b/src/test/java/org/jetlinks/reactor/ql/ReactorQLTest.java index a0aeb02..4c122c8 100644 --- a/src/test/java/org/jetlinks/reactor/ql/ReactorQLTest.java +++ b/src/test/java/org/jetlinks/reactor/ql/ReactorQLTest.java @@ -1129,7 +1129,7 @@ void testGroupTake2() { } @Test - void testGroupTakeLast() { + void testTakeLast() { ReactorQL.builder() .sql("select take(this,-1) v from test group by _window(5)") .build() @@ -1139,10 +1139,20 @@ void testGroupTakeLast() { .as(StepVerifier::create) .expectNext(4, 9, 10) .verifyComplete(); + + ReactorQL.builder() + .sql("select this v from test group by _window(5), take(-1)") + .build() + .start(Flux.range(0, 11)) + .doOnNext(System.out::println) + .map(map -> map.get("v")) + .as(StepVerifier::create) + .expectNext(4, 9, 10) + .verifyComplete(); } @Test - void testGroupTake3Last2() { + void testTake3Last2() { ReactorQL.builder() .sql("select take(this,3,-2) v from test group by _window(5)") .build() @@ -1152,10 +1162,20 @@ void testGroupTake3Last2() { .as(StepVerifier::create) .expectNext(1, 2, 6, 7, 10) .verifyComplete(); + + ReactorQL.builder() + .sql("select take(this,3,-2) v from test group by _window(5),take(3,-2)") + .build() + .start(Flux.range(0, 11)) + .doOnNext(System.out::println) + .map(map -> map.get("v")) + .as(StepVerifier::create) + .expectNext(1, 2, 6, 7, 10) + .verifyComplete(); } @Test - void testGroupLast3Tak2() { + void testLast3Tak2() { ReactorQL.builder() .sql("select take(this,-3,2) v from test group by _window(5)") .build() @@ -1165,6 +1185,16 @@ void testGroupLast3Tak2() { .as(StepVerifier::create) .expectNext(2, 3, 7, 8,10) .verifyComplete(); + + ReactorQL.builder() + .sql("select take(this,3,-2) v from test group by _window(5),take(3,-2)") + .build() + .start(Flux.range(0, 11)) + .doOnNext(System.out::println) + .map(map -> map.get("v")) + .as(StepVerifier::create) + .expectNext(1, 2, 6, 7, 10) + .verifyComplete(); }