Sin embargo, hoy estaba usando una secuencia que realizaba una operación parallel()
después de un mapa; la fuente subyacente es un iterador que no es seguro para subprocesos, que es similar a la implementación de BufferedReader.lines .
Sin embargo, originalmente pensé que se llamaría a trySplit en el hilo creado; Observé que los accesos al iterador provienen de múltiples hilos.
Por ejemplo, la siguiente implementación tonta del iterador solo se configura con suficientes elementos para causar la división y también realiza un seguimiento de los subprocesos únicos que accedieron al método hasNext
.
class SillyIterator implements Iterator<String> { private final ArrayDeque<String> src = IntStream.range(1, 10000) .mapToObj(Integer::toString) .collect(toCollection(ArrayDeque::new)); private Map<String, String> ts = new ConcurrentHashMap<>(); public Set<String> threads() { return ts.keySet(); } private String nextRecord = null; @Override public boolean hasNext() { var n = Thread.currentThread().getName(); ts.put(n, n); if (nextRecord != null) { return true; } else { nextRecord = src.poll(); return nextRecord != null; } } @Override public String next() { if (nextRecord != null || hasNext()) { var rec = nextRecord; nextRecord = null; return rec; } throw new NoSuchElementException(); } }
Usando esto para crear una secuencia de la siguiente manera:
var iter = new SillyIterator(); StreamSupport .stream(Spliterators.spliteratorUnknownSize( iter, Spliterator.ORDERED | Spliterator.NONNULL ), false) .map(n -> "value = " + n) .parallel() .collect(toList()); System.out.println(iter.threads());
Esto en mi sistema genera los dos subprocesos de unión de bifurcación, así como el subproceso principal, lo que me asustó un poco.
[ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-2, main]
La seguridad de subprocesos no implica necesariamente que solo un subproceso acceda al mismo. El aspecto importante es que no hay acceso concurrente, es decir, no hay acceso por más de un subproceso al mismo tiempo. Si el acceso por diferentes subprocesos está ordenado temporalmente y este orden también garantiza la visibilidad de memoria necesaria, que es responsabilidad de la persona que llama, todavía es un uso seguro para subprocesos.
La documentación Spliterator
dice:
A pesar de su utilidad obvia en algoritmos paralelos, no se espera que los divisores sean seguros para subprocesos; en cambio, las implementaciones de algoritmos paralelos que usan spliterators deberían garantizar que el spliterator solo lo use un subproceso a la vez. En general, esto es fácil de lograr a través del confinamiento de subprocesos en serie , que a menudo es una consecuencia natural de los algoritmos paralelos típicos que funcionan mediante descomposición recursiva.
El spliterator no necesita estar confinado al mismo subproceso a lo largo de su vida útil, pero debe haber un traspaso claro en el lado de la persona que llama para garantizar que el subproceso anterior deje de usarlo antes de que el nuevo subproceso comience a usarlo.
Pero lo importante es que el divisor no necesita ser seguro para subprocesos, por lo tanto, el iterador envuelto por un divisor tampoco necesita ser seguro para subprocesos.
Tenga en cuenta que un comportamiento típico es dividir y entregar antes de iniciar el recorrido, pero dado que un Iterator
normal no admite la división, el divisor envolvente tiene que iterar y almacenar en búfer los elementos para implementar la división. Por lo tanto, el Iterator
experimenta el recorrido de diferentes subprocesos (pero uno a la vez) cuando el recorrido no se ha iniciado desde la perspectiva de la implementación de Stream
.
Dicho esto, la implementación de lines()
de BufferedReader
es un mal ejemplo que no debe seguir. Dado que se centra en una sola llamada readLine()
, sería natural implementar Spliterator
directamente en lugar de implementar un Iterator
más complicado y ajustarlo a través spliteratorUnknownSize(…)
.
Dado que su ejemplo también se centra en una sola llamada poll()
, también es sencillo implementar Spliterator
directamente:
class SillySpliterator extends Spliterators.AbstractSpliterator<String> { private final ArrayDeque<String> src = IntStream.range(1, 10000) .mapToObj(Integer::toString).collect(toCollection(ArrayDeque::new)); SillySpliterator() { super(Long.MAX_VALUE, ORDERED | NONNULL); } @Override public boolean tryAdvance(Consumer<? super String> action) { String nextRecord = src.poll(); if(nextRecord == null) return false; action.accept(nextRecord); return true; } }
Dependiendo de su caso de la vida real, también puede pasar el tamaño real de deque al constructor y proporcionar la característica SIZED
.
Entonces, puedes usarlo como
var result = StreamSupport.stream(new SillySpliterator(), true) .map(n -> "value = " + n) .collect(toList());