• Empleos
  • Sobre nosotros
  • profesionales
    • Inicio
    • Empleos
    • Cursos y retos
  • empresas
    • Inicio
    • Publicar vacante
    • Nuestro proceso
    • Precios
    • Evaluaciones
    • Nómina
    • Blog
    • Comercial
    • Calculadora de salario

0

148
Vistas
Custom Collector that cannot work in parallel

I have made a custom collector that uses a MessageDigest to create a hash. In general MessageDigest does not work in parallel. The issue I'm seeing is in the combiner() method. It is not possible to combine two MessageDigest objects. When I return null it seems to work but if I throw an UnsupportedOperationException it fails. What is the typical way to implement a collector that doesn't support parallel operations?

class ChecksumCollector implements Collector<String, MessageDigest, ByteBuffer> {
    private String algorithm;

    ChecksumCollector(final String algorithm) {
        this.algorithm = algorithm;
    }

    @Override
    public Supplier<MessageDigest> supplier() {
        return () -> {
            try {
                return MessageDigest.getInstance(algorithm);
            } catch (NoSuchAlgorithmException e) {
                throw new UnsupportedOperationException("Could not find MessageDigest for algorithm " + algorithm, e);
            }
        };
    }

    @Override
    public BiConsumer<MessageDigest, String> accumulator() {
        return (md, s) -> md.update(s.getBytes(StandardCharsets.UTF_8));
    }

    @Override
    public BinaryOperator<MessageDigest> combiner() {
        return null; //seems to work but hash may not be correct?
        //throw new UnsupportedOperationException(LineDuplicationHash.class.getSimpleName() + " does not support parallel streams");
    }

    @Override
    public Function<MessageDigest, ByteBuffer> finisher() {
        return md -> ByteBuffer.wrap(md.digest());
    }

    @Override
    public Set<Characteristics> characteristics() {
        return Set.of();
    }
}
over 3 years ago · Santiago Trujillo
3 Respuestas
Responde la pregunta

0

A Collector’s BinaryOperator returned by combiner() will only be used when used for parallel streams, however the combiner() method itself will be invoked when calling Stream.collect() to retrieve that combiner, in the JDK’s implementation (see ReduceOps.makeRef(Collector)).

You thus have 2 options:

  • either return null, which would cause a NullPointerException if your collector is used in a parallel Stream, at the time the combiner needs to be used;
  • or return a BinaryOperator that actually throws the exception when called:
return (a, b) -> throw new UnsupportedOperationException(
    LineDuplicationHash.class.getSimpleName() + " does not support parallel streams");

This second option would be more friendly to the unknowing developer who later changes your pipeline to make it parallel.

over 3 years ago · Santiago Trujillo Denunciar

0

Although it doesn't make sense to work with MessageDigest in parallel, but if there would be some operation's in the pipeline that might be parallelized (and the data set has the potential to be large enough to pay to the overheads of using the parallel stream) you may consider the option of creating two versions of this collector.

The remedy for your sequential implementation is already provided in the Didier's answer. Providing a proper binary operator that throws an exception is far better than simply returning null.

For the parallel version of this collector data from the stream could be initially collected to an auxiliary mutable container and then the finisher function will populate the MessageDigest with data from the container. I want to reiterate that will make sense only if collect is presided by some operations that could be parallelized and input is significant enough otherwise it'll just cause unnecessary overheads.

That's how a parallel collector might look like

     Collector<String, ?, ByteBuffer> checksum =
                Collectors.collectingAndThen(Collectors.toList(), 
                                             list -> digestAndWrap(list, "SHA-512"));
    private static ByteBuffer digestAndWrap(List<String> list, String algorithm) {
        MessageDigest md;
        try {
            md = MessageDigest.getInstance(algorithm);
        } catch (NoSuchAlgorithmException e) {
            throw new UnsupportedOperationException("Could not find MessageDigest for algorithm " + algorithm, e);
        }
        for (String next: list) {
            md.update(next.getBytes(StandardCharsets.UTF_8));
        }
        return ByteBuffer.wrap(md.digest());
    }
over 3 years ago · Santiago Trujillo Denunciar

0

If you want to make this work you need to regain associativity by reducing indirectly over functions instead of directly over the message digest. Try this:

Collector<String, ?, ByteBuffer> checksumCollector = collectingAndThen(
    reducing(
        Function.identity(),
        s -> md -> {
            md.update(s.getBytes(StandardCharsets.UTF_8));
            return md;
        },
        (BinaryOperator<Function<MessageDigest, MessageDigest>>) Function::andThen),
    endo -> ByteBuffer.wrap(endo.apply(getMessageDigest("SHA-256")).digest()));

Where getMessageDigest() is a small helper method taking care of the checked exception:

static MessageDigest getMessageDigest(String algorithm) {
    try {
        return MessageDigest.getInstance(algorithm);
    } catch (NoSuchAlgorithmException e) {
        throw new UnsupportedOperationException("Could not find MessageDigest for algorithm " + algorithm, e);
    }    
}

Effectively this defers the actual calculation of the message digest to the finisher.

over 3 years ago · Santiago Trujillo Denunciar
Responde la pregunta
Encuentra empleos remotos

¡Descubre la nueva forma de encontrar empleo!

Top de empleos
Top categorías de empleo
Empresas
Publicar vacante Precios Nuestro proceso Comercial
Legal
Términos y condiciones Política de privacidad
© 2025 PeakU Inc. All Rights Reserved.

Andres GPT

Recomiéndame algunas ofertas
Necesito ayuda