Creé un recopilador personalizado que usa un MessageDigest para crear un hash. En general, MessageDigest no funciona en paralelo. El problema que veo está en el método combiner()
. No es posible combinar dos objetos MessageDigest. Cuando devuelvo nulo, parece funcionar, pero si lanzo una UnsupportedOperationException
, falla. ¿Cuál es la forma típica de implementar un recopilador que no admite operaciones paralelas?
class ChecksumCollector implements Collector<String, MessageDigest, ByteBuffer> { private String algorithm; ChecksumCollector(final String algorithm) { this.algorithm = algorithm; } @Override public Supplier<MessageDigest> supplier() { return () -> { try { return MessageDigest.getInstance(algorithm); } catch (NoSuchAlgorithmException e) { throw new UnsupportedOperationException("Could not find MessageDigest for algorithm " + algorithm, e); } }; } @Override public BiConsumer<MessageDigest, String> accumulator() { return (md, s) -> md.update(s.getBytes(StandardCharsets.UTF_8)); } @Override public BinaryOperator<MessageDigest> combiner() { return null; //seems to work but hash may not be correct? //throw new UnsupportedOperationException(LineDuplicationHash.class.getSimpleName() + " does not support parallel streams"); } @Override public Function<MessageDigest, ByteBuffer> finisher() { return md -> ByteBuffer.wrap(md.digest()); } @Override public Set<Characteristics> characteristics() { return Set.of(); } }
Un BinaryOperator
de Collector
devuelto por combiner()
solo se usará cuando se use para flujos paralelos, sin embargo, el método combiner()
en sí mismo se invocará al llamar a Stream.collect()
para recuperar ese combinador, en la implementación de JDK (consulte ReduceOps.makeRef(Collector)
).
Por lo tanto, tiene 2 opciones:
null
, lo que provocaría una NullPointerException
si su recopilador se usa en un Stream paralelo, en el momento en que se necesita usar el combinador;BinaryOperator
que en realidad lanza la excepción cuando se llama: return (a, b) -> throw new UnsupportedOperationException( LineDuplicationHash.class.getSimpleName() + " does not support parallel streams");
Esta segunda opción sería más amigable para el desarrollador desconocido que luego cambia su canalización para que sea paralelo.
Aunque no tiene sentido trabajar con MessageDigest en paralelo, pero si hubiera alguna operación en la canalización que podría paralelizarse (y el conjunto de datos tiene el potencial de ser lo suficientemente grande como para pagar los gastos generales del uso del flujo paralelo ) puede considerar la opción de crear dos versiones de este recopilador.
El remedio para su implementación secuencial ya se proporciona en la respuesta de Didier . Proporcionar un operador binario adecuado que arroje una excepción es mucho mejor que simplemente devolver null
.
Para la versión paralela de este recopilador, los datos de la transmisión se pueden recopilar inicialmente en un contenedor mutable auxiliar y, luego, la función de finisher
completará el MessageDigest
con datos del contenedor. Quiero reiterar que tendrá sentido solo si la collect
está presidida por algunas operaciones que podrían paralelizarse y la entrada es lo suficientemente significativa; de lo contrario, solo causará gastos generales innecesarios.
Así es como podría verse un colector paralelo
Collector<String, ?, ByteBuffer> checksum = Collectors.collectingAndThen(Collectors.toList(), list -> digestAndWrap(list, "SHA-512"));
private static ByteBuffer digestAndWrap(List<String> list, String algorithm) { MessageDigest md; try { md = MessageDigest.getInstance(algorithm); } catch (NoSuchAlgorithmException e) { throw new UnsupportedOperationException("Could not find MessageDigest for algorithm " + algorithm, e); } for (String next: list) { md.update(next.getBytes(StandardCharsets.UTF_8)); } return ByteBuffer.wrap(md.digest()); }
Si desea que esto funcione, debe recuperar la asociatividad reduciendo indirectamente sobre las funciones en lugar de directamente sobre el resumen del mensaje. Prueba esto:
Collector<String, ?, ByteBuffer> checksumCollector = collectingAndThen( reducing( Function.identity(), s -> md -> { md.update(s.getBytes(StandardCharsets.UTF_8)); return md; }, (BinaryOperator<Function<MessageDigest, MessageDigest>>) Function::andThen), endo -> ByteBuffer.wrap(endo.apply(getMessageDigest("SHA-256")).digest()));
Donde getMessageDigest()
es un pequeño método auxiliar que se ocupa de la excepción marcada:
static MessageDigest getMessageDigest(String algorithm) { try { return MessageDigest.getInstance(algorithm); } catch (NoSuchAlgorithmException e) { throw new UnsupportedOperationException("Could not find MessageDigest for algorithm " + algorithm, e); } }
Efectivamente, esto difiere el cálculo real del resumen del mensaje al finalizador.