• Jobs
  • About Us
  • professionals
    • Home
    • Jobs
    • Courses and challenges
  • business
    • Home
    • Post vacancy
    • Our process
    • Pricing
    • Assessments
    • Payroll
    • Blog
    • Sales
    • Salary Calculator

0

1.4K
Views
Cómo pasar el marco de datos de pandas a las tareas de flujo de aire

Estoy aprendiendo a usar el flujo de aire para crear una canalización de aprendizaje automático.

Pero no encontró una manera de pasar el marco de datos de pandas generado a partir de 1 tarea a otra tarea ... ¿Parece que necesita convertir los datos al formato JSON o guardar los datos en la base de datos dentro de cada tarea?

Finalmente, tuve que poner todo en 1 tarea... ¿Hay alguna forma de pasar el marco de datos entre las tareas de flujo de aire?

Aquí está mi código:

 from datetime import datetime import pandas as pd import numpy as np import os import lightgbm as lgb from sklearn.model_selection import train_test_split from sklearn.model_selection import StratifiedKFold from sklearn.metrics import balanced_accuracy_score from airflow.decorators import dag, task from airflow.operators.python_operator import PythonOperator @dag(dag_id='super_mini_pipeline', schedule_interval=None, start_date=datetime(2021, 11, 5), catchup=False, tags=['ml_pipeline']) def baseline_pipeline(): def all_in_one(label): path_to_csv = os.path.join('~/airflow/data','leaf.csv') df = pd.read_csv(path_to_csv) y = df[label] X = df.drop(label, axis=1) folds = StratifiedKFold(n_splits=5, shuffle=True, random_state=10) lgbm = lgb.LGBMClassifier(objective='multiclass', random_state=10) metrics_lst = [] for train_idx, val_idx in folds.split(X, y): X_train, y_train = X.iloc[train_idx], y.iloc[train_idx] X_val, y_val = X.iloc[val_idx], y.iloc[val_idx] lgbm.fit(X_train, y_train) y_pred = lgbm.predict(X_val) cv_balanced_accuracy = balanced_accuracy_score(y_val, y_pred) metrics_lst.append(cv_balanced_accuracy) avg_performance = np.mean(metrics_lst) print(f"Avg Performance: {avg_performance}") all_in_one_task = PythonOperator(task_id='all_in_one_task', python_callable=all_in_one, op_kwargs={'label':'species'}) all_in_one_task # dag invocation pipeline_dag = baseline_pipeline()
about 3 years ago · Santiago Trujillo
2 answers
Answer question

0

Aunque se usa en muchas tareas de ETL, Airflow no es la opción correcta para ese tipo de operaciones, está destinado al flujo de trabajo, no al flujo de datos. Pero hay muchas formas de hacerlo sin pasar todo el marco de datos entre tareas.

Puede pasar información sobre los datos usando xcom.push y xcom.pull:

una. Guarde el resultado de la primera tarea en algún lugar (json, csv, etc.)

b. Pase a xcom.push información sobre el archivo guardado. Por ejemplo, nombre de archivo, ruta.

C. Lea este nombre de archivo usando xcom.pull de la otra tarea y realice la operación necesaria.

O:

Todo lo anterior usando algunas tablas de base de datos:

una. En task_1 puede descargar datos de table_1 en algún marco de datos, procesarlos y guardarlos en otra table_2 (df.to_sql()).

b. Pase el nombre de la tabla usando xcom.push.

C. De la otra tarea, obtenga table_2 usando xcom.pull y léalo con df.read_sql().

Puede obtener información sobre cómo usar xcom en los ejemplos de flujo de aire. Ejemplo: https://github.com/apache/airflow/blob/main/airflow/example_dags/tutorial_etl_dag.py

En mi humilde opinión, hay muchas otras formas mejores, acabo de escribir lo que probé.

about 3 years ago · Santiago Trujillo Report

0

Completamente de acuerdo con @Talgat en que Airflow no está realmente diseñado para esto. Se centra en las dependencias de tareas en lugar de las dependencias de datos.

¿Quizás pueda buscar una solución de canalización centrada en datos como ZenML para resolver este problema? Tiene una guía con ejemplos de pasar Pandas Dataframes a través de los pasos de la canalización. También puede aprovechar el almacenamiento en caché de datos en los pasos y otras características que lo hacen más adecuado para lo que está haciendo.

Además, una canalización de ZenML también se puede implementar como un DAG de Airflow . Entonces, en lugar de enfocarse en escribir usted mismo la lógica persistente del artefacto, puede dejar que ZenML lo maneje.

Descargo de responsabilidad: soy uno de los principales contribuyentes de ZenML, por lo que es cierto que esto está sesgado. ¡Todavía pensé que podría ser útil para el OP!

about 3 years ago · Santiago Trujillo Report
Answer question
Find remote jobs

Discover the new way to find a job!

Top jobs
Top job categories
Business
Post vacancy Pricing Our process Sales
Legal
Terms and conditions Privacy policy
© 2025 PeakU Inc. All Rights Reserved.

Andres GPT

Recommend me some offers
I have an error