• Empleos
  • Sobre nosotros
  • Empleos
    • Inicio
    • Empleos
    • Cursos y retos
  • Empresas
    • Inicio
    • Publicar vacante
    • Nuestro proceso
    • Precios
    • Evaluaciones
    • Nómina
    • Blog
    • Comercial
    • Calculadora de salario

0

154
Vistas
Recopilador personalizado que no puede funcionar en paralelo

Creé un recopilador personalizado que usa un MessageDigest para crear un hash. En general, MessageDigest no funciona en paralelo. El problema que veo está en el método combiner() . No es posible combinar dos objetos MessageDigest. Cuando devuelvo nulo, parece funcionar, pero si lanzo una UnsupportedOperationException , falla. ¿Cuál es la forma típica de implementar un recopilador que no admite operaciones paralelas?

 class ChecksumCollector implements Collector<String, MessageDigest, ByteBuffer> { private String algorithm; ChecksumCollector(final String algorithm) { this.algorithm = algorithm; } @Override public Supplier<MessageDigest> supplier() { return () -> { try { return MessageDigest.getInstance(algorithm); } catch (NoSuchAlgorithmException e) { throw new UnsupportedOperationException("Could not find MessageDigest for algorithm " + algorithm, e); } }; } @Override public BiConsumer<MessageDigest, String> accumulator() { return (md, s) -> md.update(s.getBytes(StandardCharsets.UTF_8)); } @Override public BinaryOperator<MessageDigest> combiner() { return null; //seems to work but hash may not be correct? //throw new UnsupportedOperationException(LineDuplicationHash.class.getSimpleName() + " does not support parallel streams"); } @Override public Function<MessageDigest, ByteBuffer> finisher() { return md -> ByteBuffer.wrap(md.digest()); } @Override public Set<Characteristics> characteristics() { return Set.of(); } }
over 3 years ago · Santiago Trujillo
3 Respuestas
Responde la pregunta

0

Un BinaryOperator de Collector devuelto por combiner() solo se usará cuando se use para flujos paralelos, sin embargo, el método combiner() en sí mismo se invocará al llamar a Stream.collect() para recuperar ese combinador, en la implementación de JDK (consulte ReduceOps.makeRef(Collector) ).

Por lo tanto, tiene 2 opciones:

  • devuelva null , lo que provocaría una NullPointerException si su recopilador se usa en un Stream paralelo, en el momento en que se necesita usar el combinador;
  • o devolver un BinaryOperator que en realidad lanza la excepción cuando se llama:
 return (a, b) -> throw new UnsupportedOperationException( LineDuplicationHash.class.getSimpleName() + " does not support parallel streams");

Esta segunda opción sería más amigable para el desarrollador desconocido que luego cambia su canalización para que sea paralelo.

over 3 years ago · Santiago Trujillo Denunciar

0

Aunque no tiene sentido trabajar con MessageDigest en paralelo, pero si hubiera alguna operación en la canalización que podría paralelizarse (y el conjunto de datos tiene el potencial de ser lo suficientemente grande como para pagar los gastos generales del uso del flujo paralelo ) puede considerar la opción de crear dos versiones de este recopilador.

El remedio para su implementación secuencial ya se proporciona en la respuesta de Didier . Proporcionar un operador binario adecuado que arroje una excepción es mucho mejor que simplemente devolver null .

Para la versión paralela de este recopilador, los datos de la transmisión se pueden recopilar inicialmente en un contenedor mutable auxiliar y, luego, la función de finisher completará el MessageDigest con datos del contenedor. Quiero reiterar que tendrá sentido solo si la collect está presidida por algunas operaciones que podrían paralelizarse y la entrada es lo suficientemente significativa; de lo contrario, solo causará gastos generales innecesarios.

Así es como podría verse un colector paralelo

 Collector<String, ?, ByteBuffer> checksum = Collectors.collectingAndThen(Collectors.toList(), list -> digestAndWrap(list, "SHA-512"));
 private static ByteBuffer digestAndWrap(List<String> list, String algorithm) { MessageDigest md; try { md = MessageDigest.getInstance(algorithm); } catch (NoSuchAlgorithmException e) { throw new UnsupportedOperationException("Could not find MessageDigest for algorithm " + algorithm, e); } for (String next: list) { md.update(next.getBytes(StandardCharsets.UTF_8)); } return ByteBuffer.wrap(md.digest()); }
over 3 years ago · Santiago Trujillo Denunciar

0

Si desea que esto funcione, debe recuperar la asociatividad reduciendo indirectamente sobre las funciones en lugar de directamente sobre el resumen del mensaje. Prueba esto:

 Collector<String, ?, ByteBuffer> checksumCollector = collectingAndThen( reducing( Function.identity(), s -> md -> { md.update(s.getBytes(StandardCharsets.UTF_8)); return md; }, (BinaryOperator<Function<MessageDigest, MessageDigest>>) Function::andThen), endo -> ByteBuffer.wrap(endo.apply(getMessageDigest("SHA-256")).digest()));

Donde getMessageDigest() es un pequeño método auxiliar que se ocupa de la excepción marcada:

 static MessageDigest getMessageDigest(String algorithm) { try { return MessageDigest.getInstance(algorithm); } catch (NoSuchAlgorithmException e) { throw new UnsupportedOperationException("Could not find MessageDigest for algorithm " + algorithm, e); } }

Efectivamente, esto difiere el cálculo real del resumen del mensaje al finalizador.

over 3 years ago · Santiago Trujillo Denunciar
Responde la pregunta
Encuentra empleos remotos

¡Descubre la nueva forma de encontrar empleo!

Top de empleos
Top categorías de empleo
Empresas
Publicar vacante Precios Nuestro proceso Comercial
Legal
Términos y condiciones Política de privacidad
© 2025 PeakU Inc. All Rights Reserved.

Andres GPT

Recomiéndame algunas ofertas
Necesito ayuda