Estoy tratando de ingerir algunos archivos y cada uno de ellos se lee como una cadena de una sola columna (lo que se espera ya que es un archivo de ancho fijo) y tengo que dividir ese valor único en diferentes columnas. Esto significa que debo acceder al marco de datos pero debo usar writeStream ya que es un marco de datos de transmisión. Este es un ejemplo de la entrada:
"64 Apple 32.32128Orange12.1932 Banana 2.45"
Marco de datos esperado:
64, Apple, 32.32 128, Orange, 12.19 32, Banana, 2.45
Observe cómo cada columna tiene la misma cantidad de caracteres (3,6,5) <-Esto es lo que tiene META_SIZE. Por lo tanto, cada fila tiene 14 caracteres cada una (suma de columnas).
Intenté usar forEach como el siguiente ejemplo, pero no está haciendo nada:
two_d = [] streamingDF = ( spark.readStream.format("cloudFiles") .option("encoding", sourceEncoding) .option("badRecordsPath", badRecordsPath) .options(**cloudfiles_config) .load(sourceBasePath) ) def process_row(string): rows = round(len(string)/chars_per_row) for i in range(rows): current_index = 0 two_d.append([]) for j in range(len(META_SIZES)): two_d[i].append(string[(i*chars_per_row+current_index) : (i*chars_per_row+current_index+META_SIZES[j])].strip()) current_index += META_SIZES[j] print(two_d[i]) query = streamingDF.writeStream.foreach(process_row).start()
Probablemente haga un withColumn para agregarlos en lugar de la lista o usar esa lista y convertirla en un marco de datos de transmisión si es posible y mejor.
Editar: agregué un ejemplo de entrada y expliqué META_SIZES
Suponiendo que las entradas son algo como lo siguiente.
... "64 Apple 32.32" "128 Orange 12.19" "32 Banana 2.45" ...
Puedes hacerlo.
streamingDF = ( spark.readStream.format("cloudFiles") .option("encoding", sourceEncoding) .option("badRecordsPath", badRecordsPath) .options(**cloudfiles_config) .load(sourceBasePath) ) #remove this line if strings are already utf-8 lines = stream_lines.select(stream_lines['value'].cast('string')) lengths = (lines.withColumn('Count', functions.split(lines['value'], ' ').getItem(0)) .withColumn('Fruit', functions.split(lines['value'], ' ').getItem(1) .withColumn('Price', functions.split(lines['value'], ' ').getItem(1))
Tenga en cuenta que "valor" se establece como el nombre de columna predeterminado al leer una cadena con readStream. Si clouds_config contiene algo que cambia el nombre de la columna de la entrada, deberá modificar el nombre de la columna en el código anterior.