Закрытое тестирование API для оповещения о заказах из Турбо-корзины

Заказы из Турбо-корзины пока можно получать только на почту и в 1С-Битрикс. Вы не раз просили нас добавить возможность передавать заказы в CMS или CRM-систему. Которой вы пользуетесь. Отличные новости: скоро у магазинов на Турбо появится API. Благодаря которому заказы из Турбо-корзины будут попадать сразу в вашу CMS/CRM-систему.

Чтобы сделать API ещё удобнее. Мы проводим закрытое бета-тестирование. Вы можете поучаствовать в нём. Если ваш магазин подключён к Турбо и вы готовы включить Турбо-корзину. Хотите первыми опробовать новую функциональность. Поделиться впечатлениями и рассказать нам. Что улучшить? Оставьте заявку — и мы свяжемся с вами.

Спасибо за помощь!

Команда Яндекс.Вебмастера

P. S. Подписывайтесь на наши каналы:
Блог Яндекса для вебмастеров
Канал Яндекса на YouTube о продвижении сайтов
Канал для владельцев сайтов в Яндекс.Дзене

Google пересмотрел сроки отказа от поддержки сторонних файлов cookie в Chrome. Согласно новым планам. Это будет сделано до конца 2023 года. В компании отметили. Что данный процесс требует больше времени. Чем предполагалось ранее.

К концу 2022 года Google планирует внедрить в Chrome ключевые технологии из тех. Что разрабатываются в рамках инициативы Privacy Sandbox. Они помогут рекламодателям сохранять эффективность в «мире без cookie».

Отказ от поддержки third-party cookies будет реализован в два этапа:

  • Первый этап (начнется в конце 2022 года). После того, как будет завершено тестирование и новые API будут запущены в Chrome. Google объявит о начале первого этапа. В течение этой стадии издатели и представители рекламной индустрии смогут перенести свои сервисы на новые инструменты. По оценкам Google. Этот этап продлится 9 месяцев. В компании будут тщательно следить за переходом и обратной связью. Прежде чем перейти на второй этап.
  • Второй этап (начнется в середине 2023 года). В течение трех месяцев Chrome постепенно отключит поддержку third-party cookies. Этот процесс продлится до конца 2023 года.

В скором времени Google предоставит более подробное расписание на сайте privacysandbox.com и будет регулярно обновлять эту информацию. Чтобы разработчики и издатели могли планировать свои действия.

О том, что Chrome откажется от поддержки third-party cookies, стало известно в январе 2020 года. На тот момент сообщалось. Что этот процесс будет завершен в ближайшие два года. То есть к началу 2022 года.

Google считает. Что новые реалии требуют новых подходов к обеспечению релевантности рекламы. При этом сведения. Которые передаются сайтам и рекламодателям. Должны быть минимизированы путем агрегирования и хранения большего количества данных непосредственно на устройстве.

На сегодняшний день Chrome и другие компании предложили более 30 решений на замену third-party cookies. Четыре из которых доступны в первоначальных пробных версиях.

Тестирование нового метода таргетинга – федеративного обучения на основе когорт – было начато в марте. При этом большинство крупных браузеров и других игроков рынка не поддержали это решение. Часть из них отказалась его внедрять (DuckDuckGoBrave), другие заняли выжидательную позицию (Firefox, Edge. Safari и Opera). В оппозицию также ушли WordPress и Amazon.

Материалы по теме:

Привет all!

Вступление

Приглянулась мне однажды идея реверс-инженеринга (реконструкции) StreamAPI из JDK8. Что и как из этого вышло опишу далее.

Ссылки

Актуальный репозиторий

Ветка актуальная для статьи

Вкратце

StreamAPI — это технология появившаяся в Java 8 позволяющая манипулировать данными в наборах (напр. коллекциями) в функциональном стиле (лямбда-выражениями). Более подробно про стримы можно почитать тут.

Зачем и для чего

Для обучения. Для более глубокого понимания исследуемой темы. В процессе написания данной статьи было получено немало опыта. Как по стримам. Так и в целом по Java и программированию. Поэтому эта риверсия является неплохой обучающей практикой. Я бы порекомендовал и новичкам и тем кто уровнем повыше выполнить самостоятельную реализацию стрима. Поверьте, независимо от вашего опыта. Вас ждут открытия 🙂

Название

Название было выбрано следующим образом: Stream Reversed → StreamRe → StreamEr → Streamer

Возможные возможности/невозможности

Поскольку это обучающий реверс-инженеринг. Целью которого является максимально понятно реализовать уже реализованное. То не стоит ожидать от стримера эффективного расхода памяти и быстродействия уровня Enterprise. Более того, в конце статьи я приведу пример сравнения производительности разных стримов. В котором стример уступает оригиналу из JDK по быстродействию.

Так же из реализации была исключена возможность распараллеливания стримов (Parallel stream). Т.к. приемлемая реализация этого подхода потребует иных принципов построения и выходит за рамки этого материала.

Описанная тут реализация сохраняет следующие преимущества гибкости StreamAPI:

  • чтения данных не происходит до вызова одного из терминальных методов. Т.е., пока мы собираем стример. Мы никак не влияем на источник данных. Не читаем его. А только формируем набор правил. По которым будет работать конвеер стрима. Когда он будет запущен вызовом одного из терминальных методов.

  • после вызова терминального метода. Данные последовательно читаются из родительского набора и так же последовательно проходят по цепочке операций. Не накапливаясь при этом в коллекциях. Массивах и т.д. Конечно, за исключением метода сортировки. Который по очевидным причинам требует предварительного накопления данных.

Spliterator vs. Iterator

Для того чтобы Stream мог функционировать. Ему необходим источник данных. Стандартная реализация JDK (далее «оригинал»). Под капотом. Для чтения источника. Использует сплитератор — Spliterator<T>.

Основная задача сплитератора — разделять данные на блоки. Т.е. порцировать их. Порцирование используется «оригиналом» для воможности распараллеливания стримов. Когда разные порции обрабатываются разными потоками. Более подробно. О сплитераторах можно почитать тут.

Поскольку мы не будем реализовывать parallel для стримера. То и в разделении данных на блоки у нас тоже нет необходимости. Для простоты примера хватит итераторa — Iterator<T>, поэтому «под капотом» именно через него и будем получать данные из родительского источника.

Жизненный цикл (внутренние состояния)

Жизненный цикл стримера я разделил на три состояния:

  • Ожидание (WAITING) — начальное состояние стримера. В этом состоянии экземпляры создаются. Пока стример находится в этом состоянии. Мы можем конструировать его из операций и вызвать один из терминальных методов когда потребуется «включить конвеер».

  • В работе (OPERATED) — в это состояние стример переходит после вызова любого терминального метода. Это состояние означает. Что либо стример находится в работе — «конвеер запущен». Либо готов к запуску. Т.е. уже сконструирован. А значит вызовы как конвеерных так и терминальных методов более невозможны.

  • Завершен (CLOSED) — Это состояние означает. Что стример завершил выполнение работы. Ссылка на внешний итератор-источник обнулена (RefCount для GC). В это состояние стример переходит после того как:

    • завершилась работа любого из терминальных методов. Даже если в источнике остались данные. Например findFirst() вернул «первый» элемент. Данные возможно еще остались. Но стример отработал свою задачу и может освободить не используемые ссылки.

    • В источнике закончились данные — hasNext() изменил свое состояние с true на false.

    • Извне, был вызван метод явного закрытия — close() и при этом стример находился в WAITING состоянии. Данное условие (WAITING) является обязательным. Поскольку мы не можем по запросу завершать работающий стрим. Таковы правила. Далее я это рассмотрю.

Подготовка

Разделим методы,стрима которые будем реализовывать на три группы:

Порождающие (factory): empty. Of, generate. Iterate, concat

Промежуточные(intermediate)/конвеерные: peek, onClose. Distinct, filter. Skip, limit. Sorted, map. MapToInt, mapToLong. MapToDouble, flatMap. FlatMapToInt. FlatMapToLong. FlatMapToDouble

Завершающие (terminal): spliterator. Parallel, unordered. ForEachOrdered. Collect, min, max. Reduce, count. ForEach, allMatch. AnyMatch, noneMatch. FindFirst, findAny. Iterator, toArray

Прочие: close, isParallel, sequential

Итак, создадим проект с начальной структурой и классом Streamer<T>, реализующий интерфейс java.util.stream.Stream<T>. Позволим IDE сгенерировать пустую реализацию всех методов (перечислены выше). Сгенерированные методы заглушим при помощи UnsupportedOperationException.

В итоге должно получиться примерно так.

Так же, сразу напишем реализацию простых методов — «однострочников» чтобы более к ним не возвращаться.

@Override
public Optional<T> findFirst() { return findAny(); //поскольку у нас упорядоченный стрим. То первый элемент (First) и есть "произвольный" (Any)
} @Override
public boolean isParallel() { return false; //мы не поддерживаем параллелизм. Поэтому всегда false
} @Override
public Stream<T> sequential() { return this; //мы "последовательны". Поэтому вернем себя же
} @Override
public void forEachOrdered(Consumer<? super T> action) { forEach(action); //опять же. Мы упорядочены источником. Поэтому в нашем случае forEach и forEachOrdered эквивалентны
} @Override
public Spliterator<T> spliterator() { return Spliterators.spliteratorUnknownSize(this.iterator(). Spliterator.ORDERED); //создадим сплитератор на основе «внутреннего» итератора
} @Override
public Stream<T> unordered() { return this; //так же. Можно вернуть себя
}

Создание экземпляров

Под капотом. Экземпляры будут создаваться единственым закрытым (private) конструктором. Который в качестве аргумента принимает внешний итератор-источник. Этот итератор и будем использовать в качестве источника данных. Клиенты же, как и в оригинале. Будут получать экземпляры стримера из статических фабрик. Стоит добавить. Что к статической фабрике of() я дополнительно добавил перегруженные методы получения экземпляров Streamer<T> из коллекций. Перечисляемых (Iterable) типов. И непосредственно из самих Iterator`ов.

Примеры порождения стримера:

package pw.komarov.streamer; import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom; public class StreamerInstancesCreationExamples { public static void main(String[] args) { Streamer.empty(); //пустой Streamer.of(new Object()); //единичный объект Streamer.of(new Integer[]{1, 4, 8, 17}); //массив Streamer.of(Arrays.asList(7.34, 9, 18.7, 3)); //Iterable (List) Streamer.of("Foo", "Bar". "Juice", "hello". "streamer"); //из констант //Infinite Streamer.generate(() -> ThreadLocalRandom.current().nextInt()); //бесконечный (рэндом-число) Streamer.generate(() -> { List strings = Arrays.asList("randomly", "returned", "string", "value"); return strings.get(ThreadLocalRandom.current().nextInt(strings.size())); }); //рэндом значение Streamer.iterate(100, (i) -> i * 2); //последовательность {100,200,400.........n} }
}

Методы generate() и iterate() порождают бесконечный стрим. Который на каждом шаге получает значение из бесконечного итератора. У которого hasNext() всегда == true и «заглушен» метод forEachRemaining():

private static abstract class AbstractInfiniteIterator<E> implements Iterator<E> { @Override public boolean hasNext() { return true; } @Override public void forEachRemaining(Consumer<? super E> consumer) { throw new UnsupportedOperationException(); }
}

Итератор для generate():

private static class InfiniteGenerator<E> extends AbstractInfiniteIterator<E> { private final Supplier<E> supplier; InfiniteGenerator(Supplier<E> supplier) { this.supplier = supplier; } @Override public E next() { return supplier.get(); }
}

Далее, сначала создаем экземпляр этого генерирующего итератора. И затем из него стример:

public static <E> Streamer<E> generate(Supplier<E> supplier) { return of(new InfiniteGenerator<>(supplier));
}

Похожим образом реализован и iterate():

public static class InfiniteIterator<E> extends AbstractInfiniteIterator<E> { private E value; //значение предыдущего шага. При первом вызове — initial private final UnaryOperator<E> unaryOperator; //клиенсткая функция генерации значения InfiniteIterator(E initial. UnaryOperator<E> unaryOperator) { this.value = initial; this.unaryOperator = unaryOperator; } @Override public E next() { E prev = this.value; this.value = unaryOperator.apply(prev); return prev; }
} public static <E> Streamer<E> iterate(E initial. UnaryOperator<E> unaryOperator { return of(new InfiniteIterator<>(initial. UnaryOperator));
}

В итоге должно получиться примерно так.

Закрытие/завершение

Опишем два метода закрытия/завершения стримера. Первый — internalClose() для внутреннего использования. Вызывать его будем когда работа стримера логически завершена. Например закончились данные в источнике или завершена работа одного из терминальных методов. В общем, в тех случаях. Когда использование стримера более невозможно. Этот метод будет так же обнулять ссылки на внешние ресурсы (чтобы уменьшить RefCount для GC) и переводить стример в CLOSED состояние.

Второй метод — внешнего закрытия. Реализует close() интерфейса AutoCloseable. Фактически же. Завершает стример только из состояния WAITING. Это сделано для того. Чтобы внешний вызов не мог повлиять на работу выполняющегося стрима. Так работает оригинал. На мой взгляд это поведение не логично. И вот почему… Предположим. Что стрим выполняет тяжеловесную операцию одним из терминальных методов. В какой то момент (к примеру. Пользователь запросил отмену действия). Мы понимаем что больше не нуждаемся в этой тяжеловесной работе и хотим ее принудительно прекратить. Стрим исполняется в другом потоке. Но у нас есть указатель на этот стрим. Вызываем close() в надежде прекратить выполнение операции. Но он продолжает работать как ни в чем небывало… А жаль… Ведь так хотелось… :).

Второй важной частью работы этого метода является вызов пользовательских onClose последовательностей. Но и тут скрывается подвох. В оригинальном стриме эти onClose выполняются только в случае явного вызова метода close(). Т.е. если стрим завершил работу. Допустим найдено искомое (min. Max и т.д.). То onClose будут просто проигнорированы. А ведь возможно там были важные финализаторы… При описанном поведении инструмент предоставляемый методом onClose() вообще не представляет практической ценности. Поскольку те же самые операции можно вызвать «вручную» из клиентского кода. После вызова close() например. Можно будет даже более гибко обработать возможные исключения.

Ну что же, имеем то. Что имеем… поэтому для поддержания совместимости реализуем эти особенности в том же виде:

private enum State {WAITING, OPERATED. CLOSED} private State state = State.WAITING; private final List<Runnable> onCloseSequences = new LinkedList<>(); @Override
public void close() { if (state == State.WAITING) internalClose(); //обработаем (выполним) клиентские onClose последовательности... RuntimeException rte = null; for (Iterator<Runnable> iterator = onCloseSequences.iterator(); iterator.hasNext(); ) { Runnable runnable = iterator.next(); try { runnable.run(); } catch (RuntimeException e) { if (rte == null) //если это первое исключение в цепочке... rte = e; //...сохраним его else //если не первое... rte.addSuppressed(e); //...сохраним его в suppressed первого } finally { iterator.remove(); } } if (rte != null) throw rte;
} private void internalClose() { externalIterator = null; state = State.CLOSED;
} private void throwIfNotWaiting() { if (state != State.WAITING) throw new IllegalStateException("stream has already been operated upon or closed");
} @Override
public Stream<T> onClose(Runnable closeHandler) { throwIfNotWaiting(); onCloseSequences.add(closeHandler); return this;
}

Контракт onClose() для стрима гласит. Что первое исключение погашается и сохраняется. Прочие исключение (если они есть). Добавляются в suppressed первого. И если было первое. То оно и бросается после выполнения всех onClose`ов. Этот контракт так же сохранен в реализации приведенной выше.

Расстановка по шаблону

Ранее мы реализовали метод проверки текущего состояния стримера. Который бросает IllegalStateException если стример не в WAITING состоянии. Теперь пришло время его расставить в места где это нужно. А нужно это сделать во всех терминальных и конвеерных методах. Кроме «однострочников» описанных ранее. Т.к. они все равно ссылаются на эти методы.

Поскольку конвеерные методы будут работать по принципу Builder`a — иметь возможность телескопического построения (прим.: object.method1().method2().method3().methodN()…), то каждый из этих методов должен возвращать экземпляр себя. В итоге шаблон конвеерного метода приобрел такой вид:

{ throwIfNotWaiting(); //todo: тут будет создание и добавление операций return this;
}

Каждый терминальный метод должен переводить стример из WAITING в OPERATED состояние. А по завершению работы — корректно закрывать его. Резюмируя вышесказанное. «шаблон» терминального метода приобретает такой вид:

{ throwIfNotWaiting(); //IllegalStateException если пытаемся использовать запущенный или завершенный стример state = State.OPERATED; //переведем в OPERATED try { ;//todo: терминальные операции… } finally { internalClose(); //выполним завершение } throw new UnsupportedOperationException("will be soon"); //чтобы не забыть про return :)
}

В итоге получилось так.

Промежуточные операции (intermediate/conveyor)

Ну вот мы и подошли к логике работы стримера. Как известно. Стрим состоит из набора операций. Которые последовательно применяются к данным которые представлены этому стриму. Поставим вопрос. Как будем хранить и как будем «строить» наборы этих операций? Тут все очень просто.

Для обозначения самой операции. Объявим интерфейс:

private interface IntermediateOperation {}

Набор операций — список элементов этого интерфейса:

private List<IntermediateOperation> intermediateOperations = new LinkedList<>();

А добавлять в этот список конкретные операции будем из конвеерных методов.

Из всех конвеерных операций стрима выделим отдельную группу — фильтрующие операции. Это операции. Которые на основании некоторого условия (предиката). Зависящего от типа операции. Определяют — пройдет ли элемент данных далее по конвееру или будет отброшен на текущем шаге. Вот список всех конвеерные методов. Относящихся к фильтрующим операциям: skip(), limit(), distinct(), filter().

Для обозначения этих операций. Объявим еще один интерфейс:

private interface FilteringOperation<T> extends IntermediateOperation. Predicate<T> {}

Predicate<T> является функциональным интерфейсом (FunctionalInterface. Подробнее https://habr.com/ru/post/512730/), и его функциональный метод — boolean test(). Реализацией этого метода в конкретной операции мы и будем определять. Пройдет ли элемент по конвееру дальше. Или будет «отброшен».

Вот так будет выглядеть класс конкретной операции (в приведенном случае skip):

private static class SkipOperation implements FilteringOperation { private final long totalCount; //количество элементов которые требуется "пропустить" private long processedCount; //количество уже "пропущеных" элементов текущей операцией SkipOperation(long totalCount) { this.totalCount = totalCount; } @Override public boolean test(Object o) { if (processedCount < totalCount) { processedCount++; return true; //пропустим элемент далее } return false; //отбросим/отфильтруем элемент }
} @Override
public Stream<T> skip(long n) { throwIfNotWaiting(); //проверим текущее состояние intermediateOperations.add(new SkipOperation(n)); //создадим Skip-операцию. И добавим ее в список операций. return this; //вернем экземпляр «себя» для возможности телескопического построения
}

По такому же принципу реализуем добавление остальных фильтрующих операций:

//limit()
private long filteredByLimit; //количество "отсеяных" limit'ом элементов private class LimitOperation implements FilteringOperation { private final long maxSize; //собственно и есть лимит LimitOperation(long maxSize) { this.maxSize = maxSize; } @Override public boolean test(Object o) { return maxSize < ++filteredByLimit; }
} @Override
public Stream<T> limit(long maxSize) { throwIfNotWaiting(); intermediateOperations.add(new LimitOperation(maxSize)); return this;
} //distinct() private static class DistinctOperation implements FilteringOperation { private Set<Object> objects = new HashSet<>(); @Override public boolean test(Object o) { return !objects.add(o); }
} @Override
public Stream<T> distinct() { throwIfNotWaiting(); intermediateOperations.add(new DistinctOperation()); return this;
} private static class FilterOperation<T> implements FilteringOperation<T> { private final Predicate<? super T> predicate; public FilterOperation(Predicate<? super T> predicate) { this.predicate = predicate; } @Override public boolean test(T t) { return !predicate.test(t); }
} @Override
public Stream<T> filter(Predicate<? super T> predicate) { throwIfNotWaiting(); intermediateOperations.add(new FilterOperation<>(predicate)); return this;
}

Не фильтрующие:

//sorted()
public static class SortedOperation<T> implements IntermediateOperation { private final Comparator<? super T> comparator; public SortedOperation() { this.comparator = null; } public SortedOperation(Comparator<? super T> comparator) { this.comparator = comparator; }
} @Override
public Stream<T> sorted() { throwIfNotWaiting(); intermediateOperations.add(new SortedOperation<>()); return this;
} @Override
public Stream<T> sorted(Comparator<? super T> comparator) { throwIfNotWaiting(); intermediateOperations.add(new SortedOperation<>(comparator)); return this;
} //map()
private static class MapOperation<T, R> implements IntermediateOperation { private final Function<? super T. ? extends R> function; MapOperation(Function<? super T. ? extends R> function) { this.function = function; }
} @SuppressWarnings("unchecked")
@Override
public <R> Stream<R> map(Function<? super T. ? extends R> mapper) { throwIfNotWaiting(); intermediateOperations.add(new MapOperation<>(mapper)); return (Streamer<R>) this;
} //flatMap()
private static class FlatMapOperation<T, R> implements IntermediateOperation { private final Function<? super T. ? extends Stream<? extends R>> function; FlatMapOperation(Function<? super T. ? extends Stream<? extends R>> function) { this.function = function; }
} @SuppressWarnings("unchecked")
@Override
public <R> Stream<R> flatMap(Function<? super T. ? extends Stream<? extends R>> mapper) { throwIfNotWaiting(); intermediateOperations.add(new FlatMapOperation<>(mapper)); return (Streamer<R>) this;
}

Отдельно тут стоит отметить peek. peek-непьющий трудовик. peek-операции. Как и onClose. Будем хранить в отдельном списке. Без классов-оберток над ним. Т.к. peek хоть и Intermediate операции. Но работают немного по другому принципу — peek-последовательности выполняются ВСЕ, РАЗОМ. ПОСЛЕ вычисления КАЖДОГО элемента.

//peek() private final List<Consumer<? super T>> peekSequences = new LinkedList<>(); @Override
public Stream<T> peek(Consumer<? super T> action) { throwIfNotWaiting(); peekSequences.add(action); return this;
}

В итоге.

Конвеерная логика

Теперь реализуем главное — логику работы «конвеера». «На пальцах» можно описать работу этого механизма примерно так: внешний итератор (externalIterator) получает элемент от источника. Затем он проходит (или не проходит) по конвееру и передается запросившему клиенту через «обратный» (streamerIterator) итератор. Похоже на систему водопровода — когда вода подается в систему насосом (насос тут externalIterator, качает речную воду). Проходит по трубам. Фильтрам (конвеер). Которые отсеивают нежелательные элементы. И подается потребителю по средствам открытия крана. Кран для потребителя — streamerIterator.

Если где-то. На этом пути элемент был отброшен (отфильтрован). То из источника будет запрошен следующий. И так далее. До тех пор, пока либо в итераторе-источнике не закончатся данные и в этом случае мы так же сообщим потребителю: «данных для Вас больше нет» (hasNext() == false), либо представим этот элемент потребителю.

Реализация конвеера (водопровода):

private class StreamerIterator implements Iterator<T> { private Boolean hasNext; private T next; @Override public boolean hasNext() { if (hasNext == null) { calcNextAndHasNext(); if (!hasNext && state != State.CLOSED) //если нет больше данных... internalClose(); //...завершим } return hasNext; } @Override public T next() { if (!hasNext()) //запросили. Но нет больше элементов?... throw new NoSuchElementException(); //... получите exception hasNext = null; //переведем состояние hasNext в «неизвестно» return next; } private void calcNextAndHasNext() { //метод расчитывающий внутренние закрытые поля next и hasNext на основании наличия опционала из getNext() Optional<T> opt = getNext(intermediateOperations); //noinspection OptionalAssignedToNull hasNext = opt != null; //если опционал не null — естьСледующий = true if (hasNext) //а если следующий есть — то... next = opt.orElse(null); //… это значение из опционала (если значения в опционале нет — то оно null } //водопровод: @SuppressWarnings("unchecked") private Optional<T> getNext(List<IntermediateOperation> operations) { T next = null; boolean terminated = false; boolean hasNext = externalIterator.hasNext(); while (hasNext && !terminated) { next = externalIterator.next(); boolean filtered = false; for (IntermediateOperation operation : operations) //пройдем по всем операциям if (operation instanceof FilteringOperation) { //если операция - фильтрующая... if (!filtered) { //… и не была отфильтрована ранее filtered = ((FilteringOperation<T>) operation).test(next); //фильтруем? (test`ом) if (filtered && operation instanceof LimitOperation) terminated = true; //а если была отфильтрована лимитной — то еще и прервем while } } else if (operation instanceof MapOperation) //если map- операция next = (T) ((MapOperation)operation).function.apply(next); else throw new UnsupportedOperationException("getNext(): " + operation.getClass().getSimpleName()); //неизвестная if (!filtered) break; else hasNext = externalIterator.hasNext(); } if (hasNext && !terminated) { //применим к полученному в итоге значению peek-операции for (Consumer<? super T> peekSequence : peekSequences) peekSequence.accept(next); return Optional.ofNullable(next); } //noinspection OptionalAssignedToNull return null; }
}

Объявим переменную (а это — кран водопровода):

private final StreamerIterator streamerIterator = new StreamerIterator();

P.S. Тут есть один нюанс. Optional у меня может быть null`ом. Да может, и это его логичное на мой взгляд применение. Не нашли значение — опционал == null. Нашли значение. Опционал его содержит (даже если оно null). Иначе какая польза от этого опционала? Да никакой! К тому же, такое его использование осуществляется только внутри закрытых методов. А значит не нарушает никаких внешних соглашений. Но, в своем рабочем коде я использую свой класс NullableOptional<>. Помимо того что он может быть EMPTY (в случаях когда значение не найдено). В нем еще есть и некий сахар. Например elseIf(), которого мне переодически нехватает в JDK Optional<> как дополнение для ifPresent(). К сожалению Optional<> объявлен как final и поэтому мой NullableOptional растет отдельной иерархией. Если кому интересно. Можете глянуть (покрытие unit-тестами прилагается):

https://github.com/koma1/Streamer/compare/NullableOptional

Наладочный пуск

Настало время выполнить первый тестовый запуск. Для этого добавим реализацию двух терминальных методов: iterator() и forEach():

@Override
public Iterator<T> iterator() { throwIfNotWaiting(); //бросим исключение если не в WAITING состоянии state = State.OPERATED; //сменим состояние на OPERATED return streamerIterator; //вернем «внутренний» итератор - (кран)
} @Override
public void forEach(Consumer<? super T> action) { throwIfNotWaiting(); state = State.OPERATED; while (streamerIterator.hasNext()) //пока в «кране» есть вода... action.accept(streamerIterator.next());//...применим action к этой воде
}

Коммитушка

Протестируем:

final Stream<?> stream = Streamer .of(108, 5, 12, 11, 4, 9, 7, 5) //инстанциируем стример из набора констант .distinct() //(108, 5, 12, 11, 4, 9, 7, [5]) - отбросим дубли .skip(1) //([108], 5, 12, 11, 4, 9, 7) - отбросим из начала - один элемент .limit(6) //(5, 12, 11, 4, 9, 7) - лимитируем выборку в шесть элементов .limit(5) //(5, 12, 11, 4, 9, [7]) - их всего шесть,,,, значит лимитируем в пять :) .map(i -> i == 11 ? 12 : i) //(5, 12, [11]->12, 4, 9) //там где значение == 11, заменим на 12, в других случаях - оставим как есть .distinct() //(5, 12, [12], 4, 9) //повторно отсеим новые дубли .map(i -> (i & 1) == 1 ? i * 2 : i) //([5]->10, 12, 4, [9]->18) //каждое нечетное умножим на два, остальные оставим как есть .skip(1) //([10], 12, 4, 18) - отбросим из начала - один элемент .map(String::valueOf) //("12", "4". "18") - преобразуем в строковые (изменится и тип стрима. Поэтому он объявлен как <?>) .map(s -> s.equals("12") ? "twelve" : s.equals("18") ? "eighteen" : String.format("(%s)unknown". S)) //то, что знаем. Преобразуем в строки ; stream.forEach(System.out::println); //("twelve". "(4)unknown". "eighteen")

Версия на github`e

А вот и результат его выполнения:

twelve
(4)unknown
eighteen Process finished with exit code 0

Вполне ожидаемый. Для проверки можно применить «хитрость». Заменить Streamer.of на Stream.of и посмотреть как отработает «оригинал». Результат в обоих случаях должен быть одинаковый. Ну вот мы и реализовали большинство конвеерных методов и два терминальных. Которых достаточно для проверки работы стримера. Из конвеерных. Пока не реализованы: sorted() и мэпперы (mapTo…, flatMap(). FlatMapTo…). Эти методы имеют некоторые особенности. Поэтому рассмотрим их реализацию отдельно.

sorted()

Как следует из названия — данный метод сортирует данные в стриме. Делает он это Comparator`ом представленным в аргументе. Либо компаратаром для представленного в стриме типа (он должен быть Comparable с собой же. Иначе — ClassCastException).

Особенностью работы данного метода является то. Что операция сортировки не является последовательной. Т.е. требует предварительного накопления всех имеющихся в источнике данных.

Рассмотрим условный пример:

	streamer.iterate(…) .limit(…) // 1.1 .sorted() //1 .distinct(…) //2.1 .filter(…) //2.2 .sorted() //2 .map(…) //3.1 .distinct(…) //3.2 .sorted() //3 .skip(…) //n.1 .filter(…) //n.2

В этом примере операции можно разделить на три условных блока с сортировкой в конце каждого (последние операции «n.1» и «n.2» не замыкаются сортировкой. Поэтому не входят в условный блок). Для того. Чтобы выполнить этот пример мы должны пойти примерно следующим путем:

  1. вычитать весь «внешний источник-итератор» в какой либо контейнер (коллекция, массив. Файл и т.д.)

  2. «Прогнать» элементы получившегося контейнера. Последовательно по конвееру операций текущего «условного блока»

  3. отсортировать этот контейнер с применением указанного в сорировщике компаратора (либо компаратором по умолчанию в случае отсутствия первого).

  4. заменить указатель внешнего итератора-источника. На итератор этой отсортированной коллекции. Теперь наш источник. Это нами же порожденный внутренний контейнер.

  5. сменить «условный блок» на следующий и выполнить данный алгоритм заново. С пункта №1. Если это был последний «условный блок». То считать выполнение сортировок оконченой и вернуть управление конвееру с итератором. Указывающим на итератор отсортированный ранее коллекции.

Если тоже самое выразить языком кода. То у меня получился следующий набор изменений:

//sorted()
private int sortedCount; //поле стримера. Хранит кол-во операций сортировки ... intermediateOperations.add(...);
sortedCount++; ... @Override
public boolean hasNext() { if (hasNext == null) { if (sortedCount > 0) //если есть сортировки... calculateSorted(); //… выполним сначала их... calcNextAndHasNext();
...
@SuppressWarnings({"OptionalAssignedToNull","unchecked"})
private void calculateSorted() { for (int i = 1; i <= sortedCount; i++) { //цикл по «условным блокам» final List<IntermediateOperation> localOperations = new LinkedList<>(); SortedOperation<T> sortedOperation = null; for (Iterator<IntermediateOperation> itr = intermediateOperations.iterator(); itr.hasNext(); ) { IntermediateOperation operation = itr.next(); try { if (operation instanceof SortedOperation) { //если операция сортировки — выделим этот блок... sortedOperation = (SortedOperation<T>) operation; break; } else localOperations.add(operation); } finally { itr.remove(); } } //на основании «условного блока» соберем коллекцию final List<T> data = new ArrayList<>(); Optional<T> nextOpt; do { nextOpt = getNext(localOperations); if (nextOpt != null) data.add(nextOpt.orElse(null)); } while (nextOpt != null); //отсортируем получившийся список... if (sortedOperation != null) data.sort(sortedOperation.comparator); //подменим итератор externalIterator = data.iterator(); }
}

diff commit`a

В этом коммите так же изменен и StressRunner — добавлено несколько сортировок.

flatMap()

Этот метод порождает новый стрим. «раскрывая/разворачивая/расхлопывая» элементы родительского стрима. Схематично это можно отобразить так:

	- element1 - subelement1_FROM_element1 - subelement2_FROM_element1 - subelement3_FROM_element1 - element2 - subelement4_FROM_element2 - subelement5_FROM_element2

Простейший способ реализации может выглядеть так:

@Override
public <R> Stream<R> flatMap(Function<? super T. ? extends Stream<? extends R>> mapper) { Objects.requireNonNull(mapper); throwIfNotWaiting(); Stream<R> result = Stream.empty(); for (T t : this) result = Stream.concat(result. Mapper.apply(t)); return result;
}

Это решение не совсем корректно. В нем порождаемый стрим не является последовательным. Данные в нем накапливаются и затем соединяются (конкатенируются) в общий итоговый результат. Это наглядно демонстрирует пример.

Запустив его. Мы увидим единоразовый. Массовый «выброс» результата в консоль. Но нас это не устраивает. Мы предпочитаем «последовательность» «массовости». Исправить это можно реализовав например такую идею: Опишем класс итератора результатирующего типа — Iterator<R>. На вход ему будем передавать итератор текущего стрима (назовем его OfT) и клиентскую функцию mapper которая будет раскладывать элементы полученные из — OfT на элементы подмножества — ofR, которые и будем по одному возвращать клиенту. Звучит запутанно. Не совсем понятно. Поэтому лучше смотреть код:

//flatMap() @Override
public <R> Stream<R> flatMap(Function<? super T. ? extends Stream<? extends R>> mapper) { Objects.requireNonNull(mapper); class IteratorOfR implements Iterator<R> { private final Iterator<T> OfT = Streamer.this.iterator(); //родительский итератор (содержит элементы множества. Которые будем раскладывать) private Iterator<? extends R> ofR; //элементы подмножества Stream<R>, текущего элемента из ofT. Которые и будем возвращать конечному клиенту @Override public R next() { if (!hasNext()) throw new NoSuchElementException(); return ofR.next(); } @Override public boolean hasNext() { while ((ofR == null || !ofR.hasNext()) && OfT.hasNext()) //если ofR не задан (напр.: первый запрос). Или в ofR отсутствуют элементы и при этом есть что раскладывать в родительском (ofT)... ofR = mapper.apply(OfT.next()).iterator(); //...разложим элемент из ofT в подмножество ofR return ofR != null && ofR.hasNext(); } } return Streamer.of(new IteratorOfR());
}

Кстати, IteratorOfR я решил сделать вложенным (enclosure) классом. Так как его использование за пределами метода flatMap() не предполагается.

https://github.com/koma1/Streamer/commit/51dc50c2c3ff9c429d1ed7b3046081e8e664f396

[flat]MapTo{Int/Long/Double}()

Этот набор методов стоит рассмотреть отдельно. Все они возвращают стрим одного из трех типов: IntStream, LongStream, DoubleStream. Спецификой данных типов стримов. Является то. Что они оперируют примитивами. А не обертками. Изза этого, например IntStream гораздо быстрее работает с числами. Чем Stream<Integer>, ведь первый работает со значением. А второй с оберткой. Реализация этих методов схожа с реализацией flatMap() описанной выше. Тут стоит правда отметить. Что поскольку mapTo[Int/Long/Double]() возвращают указанные выше типы стримов. То принцип их реализации немного отличается от обычного map() и более похож на реализацию flatMap(), за тем исключением что элементы родительского стрима не раскладываются на подмножества. А модифицируются соответствующим mapper`ом и возвращаются по одному. Звучит запутанно. Смотрим код:

@Override
public IntStream mapToInt(ToIntFunction<? super T> mapper) { Objects.requireNonNull(mapper); class OfInt implements PrimitiveIterator.OfInt { @Override public int nextInt() { return mapper.applyAsInt(streamerIterator.next()); } @Override public boolean hasNext() { return streamerIterator.hasNext(); } } return StreamSupport .intStream( Spliterators.spliteratorUnknownSize( new OfInt(), 0), false);
}

Оставшиеся методы …mapTo…() сделаны схожим образом. Поэтому просто приведу их код:

@Override
public LongStream mapToLong(ToLongFunction<? super T> mapper) { Objects.requireNonNull(mapper); class OfLong implements PrimitiveIterator.OfLong { private final Iterator<T> ofT = Streamer.this.iterator(); @Override public long nextLong() { return mapper.applyAsLong(ofT.next()); } @Override public boolean hasNext() { return ofT.hasNext(); } } return StreamSupport .longStream( Spliterators.spliteratorUnknownSize( new OfLong(), 0), false);
} @Override
public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) { Objects.requireNonNull(mapper); class OfDouble implements PrimitiveIterator.OfDouble { private final Iterator<T> ofT = Streamer.this.iterator(); @Override public double nextDouble() { return mapper.applyAsDouble(ofT.next()); } @Override public boolean hasNext() { return ofT.hasNext(); } } return StreamSupport .doubleStream( Spliterators.spliteratorUnknownSize( new OfDouble(), 0), false);
} @Override
public IntStream flatMapToInt(Function<? super T. ? extends IntStream> mapper) { Objects.requireNonNull(mapper); class OfInt implements PrimitiveIterator.OfInt { private final Iterator<T> ofT = Streamer.this.iterator(); private PrimitiveIterator.OfInt ofInt; @Override public int nextInt() { if (!hasNext()) throw new NoSuchElementException(); return ofInt.next(); } @Override public boolean hasNext() { while ((ofInt == null || !ofInt.hasNext()) && ofT.hasNext()) ofInt = mapper.apply(ofT.next()).iterator(); return ofInt != null && ofInt.hasNext(); } } return StreamSupport.intStream( Spliterators.spliteratorUnknownSize(new OfInt(), 0), false );
} @Override
public LongStream flatMapToLong(Function<? super T. ? extends LongStream> mapper) { Objects.requireNonNull(mapper); class OfLong implements PrimitiveIterator.OfLong { private final Iterator<T> ofT = Streamer.this.iterator(); private PrimitiveIterator.OfLong ofLong; @Override public long nextLong() { if (!hasNext()) throw new NoSuchElementException(); return ofLong.next(); } @Override public boolean hasNext() { while ((ofLong == null || !ofLong.hasNext()) && ofT.hasNext()) ofLong = mapper.apply(ofT.next()).iterator(); return ofLong != null && ofLong.hasNext(); } } return StreamSupport.longStream( Spliterators.spliteratorUnknownSize(new OfLong(), 0), false );
} @Override
public DoubleStream flatMapToDouble(Function<? super T. ? extends DoubleStream> mapper) { Objects.requireNonNull(mapper); class OfDouble implements PrimitiveIterator.OfDouble { private final Iterator<T> ofT = Streamer.this.iterator(); private PrimitiveIterator.OfDouble ofDouble; @Override public double nextDouble() { if (!hasNext()) throw new NoSuchElementException(); return ofDouble.next(); } @Override public boolean hasNext() { while ((ofDouble == null || !ofDouble.hasNext()) && ofT.hasNext()) ofDouble = mapper.apply(ofT.next()).iterator(); return ofDouble != null && ofDouble.hasNext(); } } return StreamSupport.doubleStream( Spliterators.spliteratorUnknownSize(new OfDouble(), 0), false );
}

Терминальные методы

Все терминальные методы стримера работают по общей последовательности действий: 1 — проверка корректности переданных аргументов (как правило — «не null»); 2 — проверка текущего состояния — должно быть WAITING, если ДА. То переводим стример в OPERATED. В противном случае бросаем исключение; 3 — выполнение требуемой логики метода и возврат результата; 4 — завершение стримера. Шаблон описанный выше выглядит так:

public ... someTerminalMethod(... args) { Objects.requireNonNull(args); //1 — проверка аргументов throwIfNotWaitingOrSetOperated(); //2 — проверка и переключение состояния try { ... //3 - выполнение требуемых действий... } finally { internalClose(); //4 — завершение стримера }
}

Еще, тут стоит добавить. Что внутри терминальных методов мы «вращаем» streamerIterator. Напомню — это наш «кран» из «водопроводной». А не «речной воды».

P.S., так же. Во всех конвеерных методах я заменил возвращаемый тип со Stream<T> на Streamer<T> т.к. это помогает избежать ненужных приведений типов в клиентском коде.

diff

P.P.S. так же. В этом коммите я покрыл тестами терминальные методы. Отчасти, логику работы этих методов можно понять по этим тестам.

Баги

В процессе написания этой публикации были обнаружены некоторые баги. Например такой код приведет к ClassCastException:

 Streamer<String> streamer = Streamer.of("10", "5". "15"); streamer.map(Integer::valueOf); //CCE не тут... streamer.forEach(System.out::println); //← а тут!

Это происходит потому. Что стример объявлен как Streamer<String>, но после вызова map() происходит смена типа на Streamer<Integer>. В forEach(), параметр action объявлен как Consumer<? super T>. На этапе компиляции. Компилятор неявно добавит приведение к типу: (String)streamerIterator.next() в методе forEach(), которое и приведет к CCE в момент выполнения.

Чтобы решить эту проблему. Можно воспользоваться способом. Которым реализован flatMap() — возвращать экземпляр нового стримера. Который связан с текущим. А текущий переводить в OPERATED. Похожим образом работает и оригинальный стрим. Но подробнее об этом поговорим позже. Для внедрения исправления. Необходимо всего лишь заменить:

return (Streamer<R>) this;

на

return (Streamer<R>)Streamer.of(this);

Тут порождаем новый стример. Завязанный (слинкованный) на «вызывающего». Кстати, вызов this.iterator() и переведет наш стример в OPERATED состояние. Поэтому даже об этом мы уже позаботились ранее 🙂 Теперь вызов forEach(), который сменил свой тип невозможен. Поскольку он в OPERATED состоянии. Но появляется новая проблема. Мы исправили CCE. Ценой размытого контракта. Этот конвеерный метод теперь является фабрикой. Пораждающей новый экземпляр. Но не билдером. Как остальные конвеерные методы. Что с этим делать. Обдумаем позже. А сейчас рассмотрим другой баг…

Код:

final AtomicInteger ai = new AtomicInteger();
Streamer.generate(ai::getAndIncrement).limit(10).forEach((v) -> System.out.print(v + " "));
System.out.print(ai);

Этот код выдает в результат:

0 1 2 3 4 5 6 7 8 9 11

В этой последовательности пропущен один элемент — «10». Потеря произошла по причине того. Что значение «10» было сгенерировано (т.е. закэшировано внутренним итератором). Но было отфильтровано операцией LimitOperation. Поэтому значение внешней переменной за циклом. Будет равно «11». Я исправил это следующим образом: LimitOperation начинает отбрасывать значения не при фактическом достижении лимита. А в момент. Когда итератор подошел к «границе» — когда пресечением лимита будет следующий. А не текущий элемент. Затем, там, где раньше он «отбрасывался». Теперь такой элемент пропускается. Но с установкой флага noNext = true, который и проверяем в дальнейшем перед генерацией элемента.

diff

Тестирование примерами

Числа Фибоначчи:

Streamer .iterate(new int[]{0,1}, ints -> new int[]{ints[1],ints[0] + ints[1]}) .limit(10) .mapToInt(ints -> ints[1]) .forEach(System.out::println); //1, 1, 2, 3, 5, 8, 13, 21, 34, 55

Сгенерируем пенсионеров:

Streamer .generate(PersonUtils::generateRandomPerson) .filter(person -> (person.gender == Person.Gender.MALE && person.age >= 63) || (person.gender == Person.Gender.FEMALE && person.age >= 58)) .limit(10) .sorted(Comparator.comparing(Person::getGender).thenComparingInt(Person::getAge).reversed()) //отсортируем сначала по полу. Затем по возрасту (в порядке убывания) .forEach(System.out::println); //выведем результат

Сгенерируем армейский призыв:

Streamer .generate(PersonUtils::generateRandomPerson) //сгенерируем персону .filter(person -> person.gender == Person.Gender.MALE) //отберем по полу .filter(person -> person.age >= 18 && person.age <= 27) //отберем по возрасту .limit(10) //остановим генерацию. Когда набрали 10 "кандидатов" .sorted(Comparator.comparingInt(Person::getAge).thenComparing(Person::getName)) //отсортируем сначала по возрасту. Затем по ФИО .forEach(System.out::println); //выведем результат

Гонки стримов

Ну вот и пришло время прощаться тестировать результат на скорость. Функциональное тестирование мы проводили во время разработки и относительно плотно его покрыли. Осталось проверить производительность… И так, устроим числовые гонки стримов… На трассу выходят три боллида: №1 — JDK IntStream; №2 — JDK stream (оригинал); №3 — Streamer; Задача — проехать на время прямую дистанцию. Длиной в 100000000 шагов итераций. На финише будем измерять время затраченое на это. Всё просто.

Код гонки:

public class SpeedRunner { public static void main(String[] args) { //CASE #1 - IntStream.iterate System.out.println("-----------------nCASE #1 - IntStream.iterate"); printStreamTiming(IntStream.iterate(1, i -> i + 1)); //CASE #2 - Stream.iterate System.out.println("-----------------nCASE #2 - Stream.iterate"); printStreamTiming(Stream.iterate(1, i -> i + 1)); //CASE #3 - Streamer.iterate System.out.println("-----------------nCASE #3 - Streamer.iterate"); printStreamTiming(Streamer.iterate(1, i -> i + 1)); } private static void printStreamTiming(Stream<Integer> stream) { long start = System.currentTimeMillis(); stream .filter(i -> i > 100000000) .limit(10) .forEach(i -> {}); long elapsed = System.currentTimeMillis() - start; System.out.printf("Elapsed time: %dms (%s)n", elapsed. New SimpleDateFormat("mm:ss.S").format(new Date(elapsed))); } private static void printStreamTiming(IntStream stream) { long start = System.currentTimeMillis(); stream .filter(i -> i > 100000000) .limit(10) .forEach(i -> {}); long elapsed = System.currentTimeMillis() - start; System.out.printf("Elapsed time: %dms (%s)n", elapsed. New SimpleDateFormat("mm:ss.S").format(new Date(elapsed))); }
}

Результат гонки:

-----------------
CASE #1 - IntStream.iterate
Elapsed time: 58ms (00:00.58)
-----------------
CASE #2 - Stream.iterate
Elapsed time: 582ms (00:00.582)
-----------------
CASE #3 - Streamer.iterate
Elapsed time: 7662ms (00:07.662)

Фаворитом гонки ожидаемо оказался IntStream из JDK — 58ms. Против 582ms у прибывшего вторым JDK Stream. Различие в скорости 10 раз! Поэтому, если вам потребуется работать с числами в стримах где важна скорость. Необходимо использовать IntStream. Вместо Stream<Integer>. Стример же прибыл к финишу третим. С результатом 7662ms. Что в 13 раз медленнее оригинала из JDK и в ~130 раз медленнее JDK IntStream`a. Результат не очень хороший. Но мне удалось буквально одним движением руки. Ускорить его примерно в семь!!! раз (1156ms). Сделал я это. Заменив return this; на return Streamer.of(this); в методе filter(). Т.е., с точки зрения производительности. Получилось выгоднее строить конвеер не из набора операций. А конвеер из связанных друг с другом стримеров. Где каждый выполняет одну отдельную операцию. Сам является как цепью. Так и отдельным ее звеном. Я не понимаю откуда такая разница. По моим соображениям такой подход наоборот должен замедлять производительность (и то не слишком заметно). Поскольку внутренний цикл должен работать быстрее чем цепной вызов связанных итераторов (по сути. Являющийся однонаправленным списком). Но это лишь теория. Практика показала обратное. По наличию свободного времени постараюсь разобраться и понять почему так.

Builder → Factory

Так же хочется отметить что замена применения вышеуказаного фикса — return this; на return Streamer.of(this); во всех терминальных методах приведет не только к ускорению стримера. Но так же смене контракта его терминальных методов. В данный момент он реализован по принципу билдера — возвращает экземпляр себя. Данная замена изменит это соглашение на фабрику — каждый вызов порождает новый экземпляр связанный с родительским. JDK стрим работает именно таким образом (фабрикой). Я свою рабочую копию оставлю на «медленном» варианте. Так как в противном случае. Чтобы было все по «уму». Придется так же переписать всю логику работы конвеера, например. Хранить не список операций List<IntermediateOperation>. А единичную операцию для данного стримера. А если идти еще более правильным путем. То реализовывать это нужно наследованием. Примерно с такой иерархией классов:

xxxStreamerOperation → AbstractStreamerOperation → AbstractStreamer → Stream…

Optional и null`ы

JDK Stream имеет такую особенность:

//NullPointerException изза Optional:
Stream.of(null, 1, 2, 3).findFirst();
Stream.of(null, 1, 2, 3).findAny();
Stream.of(1, null, 2, 3).min(Comparator.nullsFirst(Integer::compareTo));
Stream.of(1, null, 2, 3).max(Comparator.nullsLast(Integer::compareTo));
Stream.of(1, null, 2, 3).reduce((total. Curr) -> { Integer t = total + (curr == null ? 0 : curr); return (t == 6 ? null : t); });

В приведенных выше примерах источником NPE является Optional накладывающий дополнительные ограничения на вышеуказанные методы. Довольно неприятная вещь. Которую нужно держать в голове работая с терминальными методами. Возвращаемый тип которых Optional<>: findAny(). FindFirst(), min(). Max(), reduce()

Переименования

Так же решил переименовать некоторые методы/поля/классы. Например of подразумевает создание экземпляра на основе константных значений(я). Поэтому методы. Порождающие стример из источника. А не из значений я переименовал из of() в from(): from(Iterable<T>); from(Iterator<T>);

Так же, добавил методы порождения из мэпы:

from(Map<K,V>)Streamer<Map.Entry<K,V>> (стример из элементов мэпы) fromMapKeys(Map<K,V>) - Streamer<K> (стример из ключей мэпы) fromMapValues(Map<K,V>) - Streamer<V> (стример из значений мэпы)

Как всегда подтверждаю коммитом 🙂

Для меньшей путанности так же переименовал: externalIterator → sourceIterator; StreamerIterator → InternalStreamerIterator, милиция → полиция. Медики → медведики

Вишенка без тортика 🙂

В качестве десерта добавлю некоторые методы. Хотя их отсутствие вполне логично и решается другими средствами. Но все же представлю их:

public Number sum(Function<T, Number> toNumberMapper) { Objects.requireNonNull(toNumberMapper); throwIfNotWaitingOrSetOperated(); double doubleResult = 0d; long longResult = 0; try { while (streamerIterator.hasNext()) { T next = streamerIterator.next(); doubleResult+=toNumberMapper.apply(next).doubleValue(); longResult+=toNumberMapper.apply(next).longValue();; } if (longResult == doubleResult) { if (longResult <= Integer.MAX_VALUE && !(longResult < Integer.MIN_VALUE)) return (int)longResult; else return longResult; } else return doubleResult; } finally { internalClose(); }
} public Number sum() { return sum(o -> o != null ? (Number)o : 0);
}

Возвращает сумму чисел если стрим представлен числовыми значениями — Number, или ClassCastException в обратном случае. Функциональный но медленный способ. Не рекомендуется для enterprise. 🙂

Если flatMap() раскладывает элементы стрима на подмножества. То должен быть метод группирующий их. Встречайте:

public <K> Map<K,Collection<T>> groupBy(Function<? super T,? extends K> groupMapper) { return collect(HashMap::new, (map. Object) -> map.merge( groupMapper.apply(object). New ArrayList<>(Collections.singletonList(object)), ((left. Right) -> { left.addAll(right); return left; }) ), null);
}

Позволяет группировать элементы в Map. Где ключ (<K>) это группа. А значение (<V>) это список элементов этой группы.

Рассмотрим на примере. Допустим есть Streamer<Number> который содержит разные подклассы чисел (Number): Integer, Long. Double, Float. Сгруппируем значения этого стримера по их типу (классу). Затем «оформлено» выведем результат:

 //создадим стример из констант Streamer<Number> streamer = Streamer.of(1, 4L, 6.5d, 18.1f, 8, 15L, 16.111125d, 218.12f, 41, 45L, 1116.5d, 222.3f); //сгруппируем по классу Map<Class, Collection<Number>> groupedByClasses = streamer.groupBy(Number::getClass); //оформим вывод результата for (Class numberClass : groupedByClasses.keySet()) { System.out.println(numberClass.getSimpleName() + ":"); for (Number number : groupedByClasses.get(numberClass)) System.out.println("t" + number); }

Результат:

Long: 4 15 45
Double: 6.5 16.111125 1116.5
Float: 18.1 218.12 222.3
Integer: 1 8 41 Process finished with exit code 0

Или по признаку — четное/нечетное:

Map<Boolean, Collection<Number>> groupedByParity = streamer.groupBy(number -> (number.intValue() & 1) == 0); //вернет Boolean.TRUE для четного и Boolean.FALSE для НЕ четного number. По которому и «объединим»
System.out.println("Четные: " + groupedByParity.get(true));
System.out.println("Нечетные: " + groupedByParity.get(false));

П.С.ы.

Изменил поведение from(Iterable) — на «отложенный запуск». Теперь, при порождении из коллекции. Итератор этой коллекции запрашивается при запуске стримера в работу. Это позволяет не блокировать коллекцию на изменения до запуска стримера. И в случае изменения этой коллекции после создания стримера избежать ConcurrentModificationException.

Финал

По мере наличия у меня времени на эту работу. Я буду продолжать свои исследования в этой области и скорее всего репозиторий будет обновлятся. Если есть идеи выраженные в коде. Прошу их pull-реквестить :). Текстовые и теоретические идеи, замечания. Оскорбления. Допущеные мною ошибки и т.д. . Прошу в комменты. 🙂 П.П.С. текст по ходу статьи может незначительно отличаться от того что в репозитории. Т.к. в процессе были и ошибки и идеи. Но это не изменяет принципов описанных тут. Изменения не значительны.

Спасибо!

Читайте так же:

  • Наполнение сайта материаламиНаполнение сайта материалами Общественное наполнение возникает в результате строительства. Земляных работ. Реконструкции. Сноса и дорожных работ. Состоящая из горных пород. Бетона, асфальта. Щебня, кирпича. Камней и земли. Общественная засыпка пригодна для повторного использования при рекультивации и формировании […]
  • По подсчётам Counterpoint, доля Apple на рынке смартфонов в денежном выражении за год выросла с 34% до 41%По подсчётам Counterpoint, доля Apple на рынке смартфонов в денежном выражении за год выросла с 34% до 41% Вслед за своими коллегами из компании Canalys итоги второго квартала на мировом рынке смартфонов подвели аналитики Counterpoint Research. Однако если специалистов Canalys интересовали количественные показатели, то в отчёте Counterpoint рынок представлен как в натуральном, так и в […]
  • Размер контента сайтаРазмер контента сайта Раскрытиеинформации : “эта статья является личным мнением исследователей. Основанным на моем почти 20-летнем опыте. На этой странице нет никакой рекламы третьих лиц или монетизированных ссылок любого рода. Внешние ссылки на сторонние сайты модерируются мной. Отказот ответственности . “ […]
  • Автоматическая настройка рекламы на Турбо-страницах: новая опция в ВебмастереАвтоматическая настройка рекламы на Турбо-страницах: новая опция в Вебмастере Теперь монетизировать контентные Турбо-страницы можно без погружения в технические тонкости настроек рекламных блоков. Для этого в Вебмастере появилась автоматическая настройка рекламы на Турбо-страницах. Не нужно разбираться в деталях. Искать баланс рекламы […]