Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@

import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand All @@ -20,15 +21,15 @@
*/
public final class PagedIterable<T> implements Iterable<T> {

private final Function<String, PagedResponse<T>> pageRetriever;
private final Function<PagingContext, PagedResponse<T>> pageRetriever;

/**
* Creates an instance of {@link PagedIterable} that consists of only a single page. This constructor takes a {@code
* Supplier} that return the single page of {@code T}.
*
* @param firstPageRetriever Supplier that retrieves the first page.
*/
public PagedIterable(Supplier<PagedResponse<T>> firstPageRetriever) {
public PagedIterable(Function<PagingOptions, PagedResponse<T>> firstPageRetriever) {
this(firstPageRetriever, null);
}

Expand All @@ -40,19 +41,19 @@ public PagedIterable(Supplier<PagedResponse<T>> firstPageRetriever) {
* @param firstPageRetriever Supplier that retrieves the first page.
* @param nextPageRetriever Function that retrieves the next page given a continuation token
*/
public PagedIterable(Supplier<PagedResponse<T>> firstPageRetriever,
Function<String, PagedResponse<T>> nextPageRetriever) {
this.pageRetriever = (continuationToken) -> (continuationToken == null)
? firstPageRetriever.get()
: nextPageRetriever.apply(continuationToken);
public PagedIterable(Function<PagingOptions, PagedResponse<T>> firstPageRetriever,
BiFunction<PagingOptions, String, PagedResponse<T>> nextPageRetriever) {
this.pageRetriever = (context) -> (context.getNextLink() == null)
? firstPageRetriever.apply(context.getPagingOptions())
: nextPageRetriever.apply(context.getPagingOptions(), context.getNextLink());
}

/**
* {@inheritDoc}
*/
@Override
public Iterator<T> iterator() {
return iterableByItemInternal().iterator();
return iterableByItemInternal(new PagingOptions()).iterator();
}

/**
Expand All @@ -62,7 +63,19 @@ public Iterator<T> iterator() {
* @return {@link Iterable} of a pages
*/
public Iterable<PagedResponse<T>> iterableByPage() {
return iterableByPageInternal();
return iterableByPageInternal(new PagingOptions());
}

/**
* Retrieve the {@link Iterable}, one page at a time. It will provide same {@link Iterable} of T values from
* starting if called multiple times.
*
* @param pagingOptions the paging options
* @return {@link Iterable} of a pages
*/
public Iterable<PagedResponse<T>> iterableByPage(PagingOptions pagingOptions) {
Objects.requireNonNull(pagingOptions, "'pagingOptions' cannot be null");
return iterableByPageInternal(pagingOptions);
}

/**
Expand All @@ -71,7 +84,7 @@ public Iterable<PagedResponse<T>> iterableByPage() {
* @return {@link Stream} of value {@code T}.
*/
public Stream<T> stream() {
return StreamSupport.stream(iterableByItemInternal().spliterator(), false);
return StreamSupport.stream(iterableByItemInternal(new PagingOptions()).spliterator(), false);
}

/**
Expand All @@ -84,8 +97,38 @@ public Stream<PagedResponse<T>> streamByPage() {
return StreamSupport.stream(iterableByPage().spliterator(), false);
}

private Iterable<T> iterableByItemInternal() {
return () -> new PagedIterator<>(pageRetriever) {
/**
* Retrieve the {@link Stream}, one page at a time. It will provide same {@link Stream} of T values from starting if
* called multiple times.
*
* @param pagingOptions the paging options
* @return {@link Stream} of a pages
*/
public Stream<PagedResponse<T>> streamByPage(PagingOptions pagingOptions) {
Objects.requireNonNull(pagingOptions, "'pagingOptions' cannot be null");
return StreamSupport.stream(iterableByPage(pagingOptions).spliterator(), false);
}

private static class PagingContext {
private final PagingOptions pagingOptions;
private final String nextLink;

private PagingContext(PagingOptions pagingOptions, String nextLink) {
this.pagingOptions = pagingOptions;
this.nextLink = nextLink;
}

private PagingOptions getPagingOptions() {
return pagingOptions;
}

private String getNextLink() {
return nextLink;
}
}

private Iterable<T> iterableByItemInternal(PagingOptions pagingOptions) {
return () -> new PagedIterator<>(pageRetriever, pagingOptions) {

private Iterator<T> nextPage;
private Iterator<T> currentPage;
Expand Down Expand Up @@ -120,8 +163,8 @@ void addPage(PagedResponse<T> page) {
};
}

private Iterable<PagedResponse<T>> iterableByPageInternal() {
return () -> new PagedIterator<T, PagedResponse<T>>(pageRetriever) {
private Iterable<PagedResponse<T>> iterableByPageInternal(PagingOptions pagingOptions) {
return () -> new PagedIterator<T, PagedResponse<T>>(pageRetriever, pagingOptions) {

private PagedResponse<T> nextPage;

Expand Down Expand Up @@ -152,12 +195,15 @@ void addPage(PagedResponse<T> page) {
private abstract static class PagedIterator<T, E> implements Iterator<E> {
private static final ClientLogger LOGGER = new ClientLogger(PagedIterator.class);

private final Function<String, PagedResponse<T>> pageRetriever;
private final Function<PagingContext, PagedResponse<T>> pageRetriever;
private final Long pageSize;
private String nextLink;
private String continuationToken;
private boolean done;

PagedIterator(Function<String, PagedResponse<T>> pageRetriever) {
PagedIterator(Function<PagingContext, PagedResponse<T>> pageRetriever, PagingOptions pagingOptions) {
this.pageRetriever = pageRetriever;
this.pageSize = pagingOptions.getPageSize();
}

@Override
Expand Down Expand Up @@ -187,7 +233,10 @@ public boolean hasNext() {

void requestPage() {
boolean receivedPages = false;
PagedResponse<T> page = pageRetriever.apply(continuationToken);
PagingOptions pagingOptions = new PagingOptions();
pagingOptions.setPageSize(pageSize);
pagingOptions.setContinuationToken(continuationToken);
PagedResponse<T> page = pageRetriever.apply(new PagingContext(pagingOptions, nextLink));
if (page != null) {
receivePage(page);
receivedPages = true;
Expand All @@ -205,8 +254,10 @@ void requestPage() {
private void receivePage(PagedResponse<T> page) {
addPage(page);

continuationToken = page.getNextLink();
this.done = continuationToken == null || continuationToken.isEmpty();
nextLink = page.getNextLink();
continuationToken = page.getContinuationToken();
this.done = (nextLink == null || nextLink.isEmpty())
&& (continuationToken == null || continuationToken.isEmpty());
Comment on lines +257 to +260

@weidongxu-microsoft weidongxu-microsoft Jan 7, 2025

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code here and above only handle the server-driven pageable (i.e. nextLink or continuationToken).

It does not handle client-side pageable (i.e. via incremental on pageIndex or index).

Handling of client-side pageable would require we get the PagingOptions of the prior request (in additional to this PagedResponse), and update it for next page.

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,23 @@ public String getNextLink() {
}

private PagedIterable<TodoItem> list() {
return new PagedIterable<>(() -> listSinglePage(), nextLink -> listNextSinglePage(nextLink));
return new PagedIterable<>((context) -> listSinglePage(context),
(context, nextLink) -> listNextSinglePage(context, nextLink));
}

private PagedResponse<TodoItem> listSinglePage() {
Response<TodoPage> res = listSync();
private PagedResponse<TodoItem> listSinglePage(PagingOptions pagingOptions) {
Response<TodoPage> res = listSync(pagingOptions);
return new PagedResponse<>(res.getRequest(), res.getStatusCode(), res.getHeaders(), res.getBody(),
res.getValue().getItems(), res.getValue().getNextLink());
}

private PagedResponse<TodoItem> listNextSinglePage(String nextLink) {
Response<TodoPage> res = listNextSync(nextLink);
private PagedResponse<TodoItem> listNextSinglePage(PagingOptions pagingOptions, String nextLink) {
Response<TodoPage> res = (nextLink == null) ? listSync(pagingOptions) : listNextSync(nextLink);

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, the code would fork in nextLink. If nextLink exists, it would just request that URL; if nextLink not exist, it would call the same API, but with pagingOptions updated (e.g. continuationToken updated from the prior page).

return new PagedResponse<>(res.getRequest(), res.getStatusCode(), res.getHeaders(), res.getBody(),
res.getValue().getItems(), res.getValue().getNextLink());
}

private Response<TodoPage> listSync() {
private Response<TodoPage> listSync(PagingOptions pagingOptions) {
// mock request on first page
return new HttpResponse<>(null, 200, null, new TodoPage(List.of(new TodoItem(), new TodoItem()), "nextLink1"));

@weidongxu-microsoft weidongxu-microsoft Jan 7, 2025

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code here should set the value of pagingOptions.continuationToken into the proxy API.
(it also means emitter/codegen need to do special handling for continuationToken param, as it should appear on proxy API, but not on client method API)

}
Expand Down