• Jobs
  • About Us
  • professionals
    • Home
    • Jobs
    • Courses and challenges
  • business
    • Home
    • Post vacancy
    • Our process
    • Pricing
    • Assessments
    • Payroll
    • Blog
    • Sales
    • Salary Calculator

0

248
Views
Codificador para conjuntos de datos Spark de tipo fila

Me gustaría escribir un codificador para un tipo de fila en DataSet, para una operación de mapa que estoy haciendo. Esencialmente, no entiendo cómo escribir codificadores.

A continuación se muestra un ejemplo de una operación de mapa:

In the example below, instead of returning Dataset<String>, I would like to return Dataset<Row>

 Dataset<String> output = dataset1.flatMap(new FlatMapFunction<Row, String>() { @Override public Iterator<String> call(Row row) throws Exception { ArrayList<String> obj = //some map operation return obj.iterator(); } },Encoders.STRING());

Entiendo que, en lugar de una cadena, el codificador debe escribirse de la siguiente manera:

 Encoder<Row> encoder = new Encoder<Row>() { @Override public StructType schema() { return join.schema(); //return null; } @Override public ClassTag<Row> clsTag() { return null; } };

Sin embargo, no entiendo el clsTag() en el codificador, y estoy tratando de encontrar un ejemplo en ejecución que pueda demostrar algo similar (es decir, un codificador para un tipo de fila)

Editar: esta no es una copia de la pregunta mencionada: error del codificador al intentar asignar la fila del marco de datos a la fila actualizada, ya que la respuesta habla sobre el uso de Spark 1.x en Spark 2.x (no lo estoy haciendo), también estoy buscando para un codificador para una clase de fila en lugar de resolver un error. Finalmente, estaba buscando una solución en Java, no en Scala.

over 3 years ago · Santiago Trujillo
2 answers
Answer question

0

La respuesta es usar un RowEncoder y el esquema del conjunto de datos usando StructType .

A continuación se muestra un ejemplo práctico de una operación de mapa plano con conjuntos de datos:

 StructType structType = new StructType(); structType = structType.add("id1", DataTypes.LongType, false); structType = structType.add("id2", DataTypes.LongType, false); ExpressionEncoder<Row> encoder = RowEncoder.apply(structType); Dataset<Row> output = join.flatMap(new FlatMapFunction<Row, Row>() { @Override public Iterator<Row> call(Row row) throws Exception { // a static map operation to demonstrate List<Object> data = new ArrayList<>(); data.add(1l); data.add(2l); ArrayList<Row> list = new ArrayList<>(); list.add(RowFactory.create(data.toArray())); return list.iterator(); } }, encoder);
over 3 years ago · Santiago Trujillo Report

0

Tuve el mismo problema... Encoders.kryo(Row.class)) funcionó para mí.

Como beneficio adicional, los documentos de ajuste de Apache Spark se refieren a Kryo, ya que es más rápido en la serialización "a menudo hasta 10 veces":

https://spark.apache.org/docs/latest/tuning.html

over 3 years ago · Santiago Trujillo Report
Answer question
Find remote jobs

Discover the new way to find a job!

Top jobs
Top job categories
Business
Post vacancy Pricing Our process Sales
Legal
Terms and conditions Privacy policy
© 2025 PeakU Inc. All Rights Reserved.

Andres GPT

Recommend me some offers
I have an error