Очень много способов ускорить Pandas
Иногда не используя Pandas
Содержание:
· 1. Простые методы
∘ Типы данных
∘ Забыть про CSV
∘ Векторизация
∘ Numpy
∘ JIT-компиляторы
· 2. Параллелизация
∘ Сторонние пакеты
· Итог
Аналитики и DS так или иначе сталкиваются с пакетом Pandas. В нем можно джойнить данные из разных БД, читать множество форматов файлов, строить простые графики без использования спец-пакетов и делать предобработку данных для ML пакетов.
К сожалению на больших объемах данных работа пакета медленная. Сегодня мы разберем простые и действенные методы, позволяющие ускорить выполнение кода на pandas в десятки, сотни и даже тысячи раз. Операции, которые будут разбираться, достаточно стандартные: чтение файла, применение простой функции, симуляции А/А-тестов.
1. Простые методы
Начнем с простых методов, не требующих установки доп. пакетов и не предъявляющих каких либо требований к железу.
Типы данных
Первый способ пришел к нам их мира баз данных. В pandas колонка может иметь один из нескольких типов данных:
object
— строки и сложные объектыstring
— с версии Pandas 1.0.0int
— целые числаfloat
— floating-point numbersbool
— булевые значения,True
илиFalse
datetime
— Даты и времяtimedelta
—интервал между двумяdatetime
category
— категориальный тип, более эффективный чем строки
Почему это важно? Если мы читаем данные через read_sql из БД или csv/txt файлов, числа могут автоматически определиться как строка, например “123456789”. Это проблема, т.к. в строке каждый символ весит 1 байт, т.е. данное число будет занимать в памяти 9 байт. Числа же в двоичной системе занимают от 2х до 8 байт в зависимости от числа: int16
содержит числа в диапазоне [-32,768 ; +32,767] , int32
— [-2,147,483,648 ; 2,147,483,647] и весит 32 бита (4 байта), int64
весит 8 байт и содержит диапазон за который ты скорее всего никогда не выйдешь. Похожим образом работают и вещественные числа (float32
и float64
).
Аналогично числам, иногда запись подразумевает только значения ложь/правда или дату/время, которые весят всего лишь 1 и 4/8 байт соответсвенно. Также, можно перевести строки в категории.
Чем меньше вес, тем быстрее обработка. Поэтому правильным подходом является преобразование данных в нужный тип. Для этого используются функции .to_numeric() , .to_datetime(), а также .astype().
Также, можно сделать downcast числовых типов до меньшего диапозона там где это возможно. Для этого воспользуемся кодом с этого kernel’a на kaggle:
Забыть про CSV
Часто аналитики сохраняют промежуточные результаты работы в файлы CSV. Это неплохая практика, поскольку позволяет не ждать пока данные повторно выгрузятся/посчитаются. Однако формат для это цели выбран ужасный.
Это текстовый формат, который хранит числа в строковом формате. Значит, файлы также как в разделе выше занимают излишнее место и имеют проблемы с производительностью при чтении/записи. Есть 2 альтернативы CSV, которые позволяют работать гораздо быстрее.
Во-первых, модуль pickle из стандартной библиотеки python. Он нужен для сериализации/десериализации объектов в поток байтов, который можно записать в файл. В примере ниже мы записываем датафрейм df в файл data.pickle и затем читаем его обратно.
import pickle
# пишем
with open('data.pickle', 'wb') as f:
pickle.dump(df, f)
# читаем
with open('data.pickle', 'rb') as f:
df = pickle.load(f)
Сам pandas имеет встроенные методы для pickle, также как и numpy.
Во-вторых, можно использовать формат parquet. Этот свободный формат данных первоначально создавался для hadoop, но может применятьcя и в классической файловой системе со многими языками программирования. Обеспечивает очень хорошее сжатие, а также сохранения типов данных. Пример:
# вначале ставим пакет
# pip install pyarrow
import pyarrow as pa
import pyarrow.parquet as pq
# пишем
table = pa.Table.from_pandas(df, preserve_index=False)
pq.write_table(table, 'data.parquet')
# читаем
table2 = pq.read_table('data.parquet')
table2.to_pandas()
# можно читать только отдельные колонки
pq.read_table('data.parquet', columns=['column1', 'column3'])
Из плюсов parquet — это колоночный формат (прям как некоторые БД), и можно читать отдельно только некоторые столбцы. Похожая фишка — партиционированные запись и чтение. Суть данного подхода в том, что можно разбить датафрейм на файлы по значению какой-либо колонки. Например, для колонки event_date
партиционированная запись по ней сделает по папке на каждую дату, и в каждой папке будет файл за определенную дату. При этом, прочитать такой комбинированный parquet можно как за определенные даты, так и целиком. В первом случае будет прирост производительности.
import pyarrow as pa
import pyarrow.parquet as pq
# пишем
table = pa.Table.from_pandas(df, preserve_index=False)
pq.write_to_dataset(table, root_path='dataset_name',
partition_cols=['column1', 'column2'])
# читаем
df = pq.read_table('dataset_name/column1=xyz/column2=ij').to_pandas()
Parquet работает гораздо быстрее csv, но медленнее pickle, хотя это нивелируется партиционированнием. При этом размер parquet будет в разы меньше CSV и pickle.
Единственная проблема, которая возникает и с pickle, и с parquet — эти форматы не human-readable, в отличии от CSV который можно открыть в любом текстовом редакторе или Excel. Если все же нужен CSV, можно использовать сжатие/архивацию.
df.to_csv('data.csv.gz', compression='gzip')
os.path.getsize('data.csv.gz')
df = pd.read_csv('data.csv.gz', compression='gzip', index_col=0)
Векторизация
Это первый метод, который необходимо знать при работе с pandas, а его игнорирование обычно приводит к проваленным собеседованиям.
Задача такая: нужно применить некоторую функцию к каждой записи. Очевидный способ, который делают новички — цикл по строкам или конкретному столбцу. Однако это антипатерн в pandas, работающий неприлично медленно на больших датафреймах. Разберем другие способы с примерами.
Например, итерация по строкам с помощью метода .iterrows(). Это наиболее медленный способ, к тому же не сохраняет типы данных. Другие варианты — использовать .itertuples(), где на каждой итерации строка рассматривается как именованный tupple. Это во много раз быстрее, чем .iterrows(). Еще один аналог — .iteritems().
Любые итерации все равно на порядки медленнее векторизованного подхода, поэтому использовать их стоит только в редких случаях, например когда результат зависит от предыдущих строк. Другой метод — использование функции .apply(). Она принимает на вход функцию и доп. параметры, и затем применяет ее к каждой строке. Это более предпочтительный способ, работающий в разы быстрее. Также, apply лаконичнее и удобнее, особенно если применять lambda-функции.
Однако, современные процессоры научились оптимизировать подобные задачи с помощью SIMD-инструкций, в которых операции производятся над вектором, а не одним значением (как это происходит когда мы итерируемся по строкам). Чтобы использовать эти инструкции, нужно явно вызвать их в пакете. Поэтому pandas содержит собственные реализации простых операций (сумма, min/max и тд), выполняющиеся гораздо быстрее итерирования. Такие функции называют векторизированными. Прежде чем использовать apply или iter…, стоит поискать в документации соответствующие векторные функции. Для строк и дат есть свои методы, например df['col'].str.contains('pat')
и df['col'].dt.days
.
Ниже сравнение времени работы методов выше для операции добавления столбца-логарифма. Результаты ошеломляющие, векторизация быстрее циклов и iterrows в тысячу раз! Похожее сравнение можно прочитать тут.
import numpy as np
import pandas as pd
import math
df = pd.DataFrame(data={'values':range(1,100_000)})
temp=[]
# -------------------------------------------------
# 1.15 секунды
for idx in range(0, df.shape[0], 1):
temp.append(math.log(df['values'].iloc[idx]))
# 7.18 секунд
for i,row in df.iterrows():
temp.append(math.log(row['values']))
# 156 миллисекунд
for row in df.itertuples():
temp.append(math.log(row.values))
# 84.6 миллисекунды
temp = df['values'].apply(lambda x: math.log(x))
# 3.38 миллисекунды
temp = np.log(df['values'])
# -------------------------------------------------
df['new_values'] = temp
Numpy
Pandas работает поверх numpy, т.е. последний использует более простые структуры и значит более легковесный и быстрый. К тому же, numpy использует оптимизированные пре-скомпилированные методы. Поэтому чтобы ускорить pandas можно вовсе отказаться от него в пользу numpy.
Чтобы перевести pandas-датафрейм в numpy, можно воспользоваться функцией .to_numpy() (или устаревшей .values), которая вернет двумерный ndarray. Хотя к колонкам в массиве можно обращаться только по индексу, можно реализовать подход с именами почти как в pandas. Для этого переводим датафрейм в массив, затем создаем словарь с именами колонок.
import pandas as pd
import numpy as np
df_pandas = pd.DataFrame(
data=[[0,1], [2,3], [4,5]],
columns = ['First Column', 'Second Column']
)
df_numpy = df_pandas.to_numpy()
cols_dict = dict(zip(
df_pandas.columns,
list(range(0,len(df_pandas.columns)))
))
# first row
df_numpy[0, cols_dict['Second Column']] = 9
В numpy для массивов есть те же арифметические функции, что и в pandas. Часто даже не нужно переводить датафрейм в ndarray, а применять функции именно к нему. К слову, иногда в pandas приходится использовать функции numpy (см. логарифмы выше). Если нужная функция не ищется, можно использовать np.vectorize(), которая сама попробует векторизовать функцию наподобие .apply().
Разберем 2 примера, когда функции numpy отрабатывают быстрее чем apply в pandas.
Во-первых, аналог функции coalesce из SQL. Помимо уже знакомого нам метода .apply() у pandas есть своя векторизованная функция combine_first(), которую мы сравним с .where() из numpy.
# использум датафрейм из кода выше
df = pd.DataFrame()
df['operation'] = df_pandas['operation'].tolist() * 1_000_000
df['First Column'] = df_pandas['First Column'].tolist() * 1_000_000
df['Second Column'] = df_pandas['Second Column'].tolist() * 1_000_000
# -------------------------------------------------
# pandas .apply()
def my_func(row):
if row['operation'] is not None:
return row['operation']
elif row['First Column'] is not None:
return row['First Column']
else:
return row['Second Column']
df['coalesce'] = df.apply(my_func, axis=1)
# 40.9 s
# -------------------------------------------------
# -------------------------------------------------
# pandas .combine_first()
df['coalesce'] = (
df['operation']
.combine_first(df['First Column'])
.combine_first(df['Second Column'])
)
# 411 ms
# -------------------------------------------------
# -------------------------------------------------
# numpy .where()
df['coalesce'] = np.where(
df['operation'].values == None,
df['First Column'].values,
df['operation'].values
)
# 148 ms
# -------------------------------------------------
Как видим, .where() в разы быстрее, а вариант с apply не стоит даже рассматривать на таком объеме данных. Если нужно использовать множество условий, есть функция np.select().
Второй пример — симуляция А/А-тестов. Возьмем средних размеров датафрейм и будем много раз разбивать его на 2 части, сравнивать их между собой и записывать p-value в массив. Методов сплитования достаточно как в numpy, так и в pandas, а также в sklearn. Более того, в numpy есть как устаревшие методы через RandomState, так и аналогичные новые методы через Generator. Ниже симуляции:
import numpy as np
import pandas as pd
import scipy.stats as stats
from sklearn.model_selection import train_test_split
from tqdm.notebook import tqdm
df_pandas = pd.DataFrame(
data=zip(
np.random.normal(loc=0, scale=1, size=1_000_000),
np.random.normal(loc=1, scale=1, size=1_000_000)
),
columns = ['metric1', 'metric2']
)
# -------------------------------------------------
pvalues = []
for i in tqdm(range(1000)):
df_a = df_pandas.sample(frac = 0.5, random_state=i)['metric1']
df_b = df_pandas.drop(df_a.index)['metric1']
_, p = stats.ttest_ind(df_a, df_b)
pvalues.append(p)
# 3min 19s
# -------------------------------------------------
# -------------------------------------------------
pvalues = []
df_arr = df_pandas.to_numpy()[:,0]
for i in tqdm(range(1000)):
x = np.random.uniform(size=df_arr.shape[0]) < 0.5
sample1 = df_arr[x]
sample2 = df_arr[~x]
_, p = stats.ttest_ind(sample1, sample2)
pvalues.append(p)
# 33.9 s
# -------------------------------------------------
# -------------------------------------------------
pvalues = []
df_arr = df_pandas.to_numpy()[:,0]
for i in tqdm(range(1000)):
x = np.random.permutation(df_arr)
sample1 = x[50000:]
sample2 = x[:50000]
_, p = stats.ttest_ind(sample1, sample2)
pvalues.append(p)
# 31.5 s
# -------------------------------------------------
# -------------------------------------------------
pvalues = []
df_arr = df_pandas.to_numpy()[:,0]
rng = np.random.default_rng()
for i in tqdm(range(1000)):
x = rng.permutation(df_arr)
sample1 = x[50000:]
sample2 = x[:50000]
_, p = stats.ttest_ind(sample1, sample2)
pvalues.append(p)
# 26.9 s
# -------------------------------------------------
# -------------------------------------------------
pvalues = []
df_arr = df_pandas.to_numpy()[:,0]
for i in tqdm(range(1000)):
s1, s2 = train_test_split(df_arr, test_size=0.5)
_, p = stats.ttest_ind(s1, s2)
pvalues.append(p)
# 45.1 s
# -------------------------------------------------
# -------------------------------------------------
pvalues = []
df_arr = df_pandas.to_numpy()[:,0]
for i in tqdm(range(1000)):
s1 = np.random.choice(range(df_arr.shape[0]), int(df_arr.shape[0] / 2), replace=False)
s2 = list(set(range(df_arr.shape[0])) - set(s1))
sample1 = df_arr[s1]
sample2 = df_arr[s2]
_, p = stats.ttest_ind(sample1, sample2)
pvalues.append(p)
# 13min 41s
# -------------------------------------------------
# -------------------------------------------------
pvalues = []
df_arr = df_pandas.to_numpy()[:,0]
rng = np.random.default_rng()
for i in tqdm(range(1000)):
s1 = rng.choice(range(df_arr.shape[0]), int(df_arr.shape[0] / 2), replace=False)
s2 = list(set(range(df_arr.shape[0])) - set(s1))
sample1 = df_arr[s1]
sample2 = df_arr[s2]
_, p = stats.ttest_ind(sample1, sample2)
pvalues.append(p)
# 13min 19s
# -------------------------------------------------
Ожидаемо numpy и sklearn быстрее pandas. Лучшие результаты у permutation (причем новая рандомизация быстрее старой), а choice выдал неадекватно плохой результат. Если нужно не просто разбить датафрейм на два, а делать выборки бутстрапом, можно использовать устар. функцию randint или новый генератор в numpy вместо sample в pandas. Также, в numpy есть многопоточная генерация случайных чисел.
Однако, не для всех операций нужно выбирать numpy. Чтение CSV в pandas гораздо быстрее, чем в numpy или стандартном python. Методы genfromtxt из numpy стоит использовать только если не хватает гибкости pd.read_csv(). Ссылка на пример в самом конце.
JIT-компиляторы
Еще один способ сделать применение функций через apply быстрее — скомплилировать их. Мануал pandas дает 3 метода для этого. Первым идет Cython. Простой способ: импортировать ipython magic %%cython
в начале ячейки с объявлениями функций и дальше комплятор сделает все сам. Более сложные оптимизации могут дать значительный эффект, но требуют знания языка C.
Второй метод — eval(). Он также работает автоматически. Нужно обернуть в этот метод математические вычисления в pandas. Для других операций это метод не работает.
Третий способ — пакет Numba. При объявлении функции необходимо указать декоратор @jit
, также можно указать параметры для параллельной обработки (так например можно генерить случайные числа в numpy). Но о параллелизации и сторонних пакетах мы подробно поговорим в следующем блоке.
2. Параллелизация
Большинство современных компьютеров имеют процессор с более чем одним ядром, т.е. с возможностью делать несколько вычислений одновременно. Более того, часто аналитики и DS работают на сервере (например, c JupyterHub), у которого число ядер может достигать сотен.
Изначально интерпретатор python — однопоточная программа с глобальным локом во время вычисления, а значит в ванильном “pandas” нельзя параллелить задачи. Однако, в стандартной библиотеке python есть несколько модулей, позволяющих работать с многопоточностью, и далее мы разберем их применение.
Первый модуль — multiprocessing. Он обходит глобальный лок, работая не через под-потоки, а через под-процессы. Основной его параметр — это кол-во процессов, на которые будет биться основной. Обычно его выбирают равным кол-ву ядер в процессоре, но я советую брать число на 1 меньше, оставляя одно ядро на накладные расходы и синхронизацию. Далее создается пул воркеров (процессов), на которые будут параллелиться вычисления. Есть несколько вариантов, как разбить по ним датафрейм, но обычно используется .array_split():
import pandas as pd
import multiprocessing as mp
def your_datarame_func(df):
...
n_cores = max(mp.cpu_count() - 1, 1)
p = mp.Pool(n_cores) # Data parallelism Object
def parallelize_dataframe(df, func, n_cores):
df_split = np.array_split(df, n_cores)
df = pd.concat( pool.map(func, df_split) )
pool.close() ; pool.join()
return df
df_results = parallelize_dataframe(df, func=your_datarame_func)
Более высокоуровневый интерфейс предоставляет другой пакет из стандартной библиотеки — concurrent.futures, однако возможностей у него меньше. Он предоставляет 2 аналогичных API для работы с процессами и тредами — ProcessPoolExecutor и ThreadPoolExecutor.
import psutil
import pandas as pd
import numpy as np
from concurrent.futures import ProcessPoolExecutor, as_completed
def your_datarame_func(df):
...
num_procs = max(psutil.cpu_count(logical=True) - 1, 1)
splitted_df = np.array_split(df, num_procs)
df_results = []
with ProcessPoolExecutor(max_workers=num_procs) as executor:
results = [executor.submit(your_datarame_func, df=df) for df in splitted_df]
for result in as_completed(results):
try:
df_results.append(result.result())
except Exception as ex:
print(str(ex))
pass
df_results = pd.concat(df_results)
В concurrent.futures можно получить pid порождаемых процессов (пример). Также, можно использовать сторонние пакеты для параллельных вычислений.
Сторонние пакеты
Первый пакет, который мы рассмотрим — Modin, поскольку он относительно зрелый (первый релиз в середине 2018). На сайте сообщается, что скорость растет почти линейно с увеличение кол-ва ядер. Modin имеет минимальный порог вхождения т.к. почти не требует изменять код. Вместо import pandas as pd
достаточно прописать import modin.pandas as pd
. Около 80% функций Pandas API поддерживается modin. При этом, надо выбрать какой движок будет обеспечивать параллельную работу — Dask или Ray (при установке пишем !pip install modin[dask]
или !pip install modin[ray]
). Однако, на бенчмарках Modin не всегда быстрее “ванильного” pandas, и на операциях вроде удаления дубликатов работает медленнее. Думаю что это происходит из-за операций, аналогичных shuffle в hadoop.
Существует вариант для тех у кого хорошая видеокарта. Пакет cudf работает поверх CUDA, и аналогично Modin достаточно изменить только импорт import cudf as pd
, но также не все функции pandas поддерживаются. Этот пакет работает только с одним GPU, для нескольких — пакет dask-cudf. Однако, не стоит слишком надеяться на cuDF, поскольку он выигрывает у modin в только в операциях group by, а в других медленнее даже “ванильного” pandas.
Еще один пакет требующий лишь небольшого изменения кода — Swifter. Он ускоряет apply, используя по своему усмотрению или векторизацию, или распараллеливание через Dask, или чистый pandas. Кстати, он работает как с классическим pandas, так и с датафреймами Modin. Его использование достаточно простое:
# обычный apply
df['new_col'] = df.apply(lambda x: random_func(x['col1'], x['col2']))
# apply с помощью swifter
import swifter
df['new_col'] = df.swifter.apply(lambda x: random_func(x['col1'], x['col2']))
Еще один пакет, заточенный именно под .apply() — Pandarallel. В отличии от Swifter, он использует голый multiprocessing вместо Dask и не пытается в векторизацию, поэтому в основном проигрывает по скорости. Пакет очень простой, достаточно простого примера из документации:
from pandarallel import pandarallel
pandarallel.initialize(progress_bar=True)
# обычный apply
df.apply(func)
# apply с помощью pandarallel
df.parallel_apply(func)
Следующий пакет — Polars, изначально написанный для Rust но имеющий python-обертку. В отличии от Modin, его использование потребует изменение кода, а сам синтаксис скорее напоминает пакет dplyr для языка R. Пакет предоставляет два API: Eager API и Lazy API (читай мануал). Eager API работает по похожему с Pandas принципу (результаты сразу после запуска). Lazy API похож на Spark, где сначала происходит планирование пайплайна, а выполнение только после операций типа collect (вывод на экран и тд). При этом используются все доступные ядра. Основные операции, на которых будет заметное увеличение скорости — группировка и джойны. К сожалению, этот пакет также не имеет аналогов всех фукнций в pandas, но всегда вернуться к нему с помощью функции .to_pandas().
Другой пример R-like кода для ускорения — порт пакета data.table на python, datatable. Он дает хороший прирост по производительности, но работает не поверх, а вместо pandas, и требует достаточного времени на изучение.
Если данных так много что одна машина точно не справится, можно параллелить на несколько. Во-первых, можно использовать уже упомянутый Dask. Во-вторых, есть Pandas API для Spark (пример).
import pyspark.pandas as ps
Однако, работа с такими массивами данных все же не задача для Pandas. Я советую использовать вышеупомянутые пакеты для данных, которые уже слишком большие для ванильного pandas, но еще слишком маленькие для Spark’а. Для работы с big-data стеком стоит выучить стандартные для него инструменты, а не мучить pandas костылями.
В интернете и на medium в частности достаточно громких цифр про то, во сколько раз данные методы ускоряют pandas. Но надо понимать, что это зависит от твоих данных и операций, которые будут применяться, поэтому точного ответа нет. Могу рекомендовать широкое исследование от h2o.ai, по результатам которого выиграл Polars. Но для каждой задачи ты должен сам выбрать, какой метод применять, а данная статья дает лишь старт для этого. На машине с одним процессором (например google colab) все выглядит иначе, вот мой пример применения чтения CSV и сплитования для А/В с помощью хэш-функции.
Итог
Главное, что нужно понимать — не стоит применять оптимизацию там, где она не нужна, особенно если выигрыш по времени сильно меньше лага на изучение нового инструмента. Pandas довольно удобен, и на небольших данных я рекомендую использовать именно его. Вызов любого стороннего инструмента поверх pandas имеет некий оверхэд по производительности. В случае Modin, у которого свой класс dataframe, этот оверхэд просто огромен.
Выводы статьи, которые стоит вынести для себя:
- Premature optimization is the root of all evil © Дональд Кнут.
- Всегда следи за типами данных и с каким форматом файлов работаешь.
- Для простой оптимизации начни с использования инструментов, доступных по умолчанию (numpy, multiprocessing). Затем пробуй простые надстройки над pandas (swifter, modin) и только если будешь неудовлетворен используй более сложные методы (dask, polars).
P.S.: Если тебе понравилась стать, в качестве благодарности можешь купить мне кофе на https://www.donationalerts.com/r/stats_data_ninja или https://buymeacoffee.com/koch.