Очень много способов ускорить Pandas

Stats&Data ninja
12 min readDec 7, 2022

--

Иногда не используя Pandas

Аналитики и DS так или иначе сталкиваются с пакетом Pandas. В нем можно джойнить данные из разных БД, читать множество форматов файлов, строить простые графики без использования спец-пакетов и делать предобработку данных для ML пакетов.

К сожалению на больших объемах данных работа пакета медленная. Сегодня мы разберем простые и действенные методы, позволяющие ускорить выполнение кода на pandas в десятки, сотни и даже тысячи раз. Операции, которые будут разбираться, достаточно стандартные: чтение файла, применение простой функции, симуляции А/А-тестов.

1. Простые методы

Начнем с простых методов, не требующих установки доп. пакетов и не предъявляющих каких либо требований к железу.

Типы данных

Первый способ пришел к нам их мира баз данных. В pandas колонка может иметь один из нескольких типов данных:

  • object — строки и сложные объекты
  • stringс версии Pandas 1.0.0
  • int — целые числа
  • float — floating-point numbers
  • bool — булевые значения, True или False
  • datetime— Даты и время
  • timedelta—интервал между двумяdatetime
  • category — категориальный тип, более эффективный чем строки

Почему это важно? Если мы читаем данные через read_sql из БД или csv/txt файлов, числа могут автоматически определиться как строка, например “123456789”. Это проблема, т.к. в строке каждый символ весит 1 байт, т.е. данное число будет занимать в памяти 9 байт. Числа же в двоичной системе занимают от 2х до 8 байт в зависимости от числа: int16 содержит числа в диапазоне [-32,768 ; +32,767] , int32 — [-2,147,483,648 ; 2,147,483,647] и весит 32 бита (4 байта), int64 весит 8 байт и содержит диапазон за который ты скорее всего никогда не выйдешь. Похожим образом работают и вещественные числа (float32 и float64).

Аналогично числам, иногда запись подразумевает только значения ложь/правда или дату/время, которые весят всего лишь 1 и 4/8 байт соответсвенно. Также, можно перевести строки в категории.

Чем меньше вес, тем быстрее обработка. Поэтому правильным подходом является преобразование данных в нужный тип. Для этого используются функции .to_numeric() , .to_datetime(), а также .astype().

Также, можно сделать downcast числовых типов до меньшего диапозона там где это возможно. Для этого воспользуемся кодом с этого kernel’a на kaggle:

Код 1. Оптимизаций занимаемой датафреймом памяти

Забыть про CSV

Часто аналитики сохраняют промежуточные результаты работы в файлы CSV. Это неплохая практика, поскольку позволяет не ждать пока данные повторно выгрузятся/посчитаются. Однако формат для это цели выбран ужасный.

Это текстовый формат, который хранит числа в строковом формате. Значит, файлы также как в разделе выше занимают излишнее место и имеют проблемы с производительностью при чтении/записи. Есть 2 альтернативы CSV, которые позволяют работать гораздо быстрее.

Во-первых, модуль pickle из стандартной библиотеки python. Он нужен для сериализации/десериализации объектов в поток байтов, который можно записать в файл. В примере ниже мы записываем датафрейм df в файл data.pickle и затем читаем его обратно.

import pickle

# пишем
with open('data.pickle', 'wb') as f:
pickle.dump(df, f)

# читаем
with open('data.pickle', 'rb') as f:
df = pickle.load(f)

Сам pandas имеет встроенные методы для pickle, также как и numpy.

Во-вторых, можно использовать формат parquet. Этот свободный формат данных первоначально создавался для hadoop, но может применятьcя и в классической файловой системе со многими языками программирования. Обеспечивает очень хорошее сжатие, а также сохранения типов данных. Пример:

# вначале ставим пакет
# pip install pyarrow

import pyarrow as pa
import pyarrow.parquet as pq

# пишем
table = pa.Table.from_pandas(df, preserve_index=False)
pq.write_table(table, 'data.parquet')

# читаем
table2 = pq.read_table('data.parquet')
table2.to_pandas()

# можно читать только отдельные колонки
pq.read_table('data.parquet', columns=['column1', 'column3'])

Из плюсов parquet — это колоночный формат (прям как некоторые БД), и можно читать отдельно только некоторые столбцы. Похожая фишка — партиционированные запись и чтение. Суть данного подхода в том, что можно разбить датафрейм на файлы по значению какой-либо колонки. Например, для колонки event_date партиционированная запись по ней сделает по папке на каждую дату, и в каждой папке будет файл за определенную дату. При этом, прочитать такой комбинированный parquet можно как за определенные даты, так и целиком. В первом случае будет прирост производительности.

import pyarrow as pa
import pyarrow.parquet as pq

# пишем
table = pa.Table.from_pandas(df, preserve_index=False)
pq.write_to_dataset(table, root_path='dataset_name',
partition_cols=['column1', 'column2'])

# читаем
df = pq.read_table('dataset_name/column1=xyz/column2=ij').to_pandas()

Parquet работает гораздо быстрее csv, но медленнее pickle, хотя это нивелируется партиционированнием. При этом размер parquet будет в разы меньше CSV и pickle.

Единственная проблема, которая возникает и с pickle, и с parquet — эти форматы не human-readable, в отличии от CSV который можно открыть в любом текстовом редакторе или Excel. Если все же нужен CSV, можно использовать сжатие/архивацию.

df.to_csv('data.csv.gz', compression='gzip')
os.path.getsize('data.csv.gz')

df = pd.read_csv('data.csv.gz', compression='gzip', index_col=0)

Векторизация

Это первый метод, который необходимо знать при работе с pandas, а его игнорирование обычно приводит к проваленным собеседованиям.

Задача такая: нужно применить некоторую функцию к каждой записи. Очевидный способ, который делают новички — цикл по строкам или конкретному столбцу. Однако это антипатерн в pandas, работающий неприлично медленно на больших датафреймах. Разберем другие способы с примерами.

Например, итерация по строкам с помощью метода .iterrows(). Это наиболее медленный способ, к тому же не сохраняет типы данных. Другие варианты — использовать .itertuples(), где на каждой итерации строка рассматривается как именованный tupple. Это во много раз быстрее, чем .iterrows(). Еще один аналог — .iteritems().

Любые итерации все равно на порядки медленнее векторизованного подхода, поэтому использовать их стоит только в редких случаях, например когда результат зависит от предыдущих строк. Другой метод — использование функции .apply(). Она принимает на вход функцию и доп. параметры, и затем применяет ее к каждой строке. Это более предпочтительный способ, работающий в разы быстрее. Также, apply лаконичнее и удобнее, особенно если применять lambda-функции.

Однако, современные процессоры научились оптимизировать подобные задачи с помощью SIMD-инструкций, в которых операции производятся над вектором, а не одним значением (как это происходит когда мы итерируемся по строкам). Чтобы использовать эти инструкции, нужно явно вызвать их в пакете. Поэтому pandas содержит собственные реализации простых операций (сумма, min/max и тд), выполняющиеся гораздо быстрее итерирования. Такие функции называют векторизированными. Прежде чем использовать apply или iter…, стоит поискать в документации соответствующие векторные функции. Для строк и дат есть свои методы, например df['col'].str.contains('pat') и df['col'].dt.days.

Ниже сравнение времени работы методов выше для операции добавления столбца-логарифма. Результаты ошеломляющие, векторизация быстрее циклов и iterrows в тысячу раз! Похожее сравнение можно прочитать тут.

import numpy as np
import pandas as pd
import math

df = pd.DataFrame(data={'values':range(1,100_000)})

temp=[]

# -------------------------------------------------
# 1.15 секунды
for idx in range(0, df.shape[0], 1):
temp.append(math.log(df['values'].iloc[idx]))

# 7.18 секунд
for i,row in df.iterrows():
temp.append(math.log(row['values']))

# 156 миллисекунд
for row in df.itertuples():
temp.append(math.log(row.values))

# 84.6 миллисекунды
temp = df['values'].apply(lambda x: math.log(x))

# 3.38 миллисекунды
temp = np.log(df['values'])
# -------------------------------------------------

df['new_values'] = temp

Numpy

Pandas работает поверх numpy, т.е. последний использует более простые структуры и значит более легковесный и быстрый. К тому же, numpy использует оптимизированные пре-скомпилированные методы. Поэтому чтобы ускорить pandas можно вовсе отказаться от него в пользу numpy.

Чтобы перевести pandas-датафрейм в numpy, можно воспользоваться функцией .to_numpy() (или устаревшей .values), которая вернет двумерный ndarray. Хотя к колонкам в массиве можно обращаться только по индексу, можно реализовать подход с именами почти как в pandas. Для этого переводим датафрейм в массив, затем создаем словарь с именами колонок.

import pandas as pd
import numpy as np

df_pandas = pd.DataFrame(
data=[[0,1], [2,3], [4,5]],
columns = ['First Column', 'Second Column']
)
df_numpy = df_pandas.to_numpy()

cols_dict = dict(zip(
df_pandas.columns,
list(range(0,len(df_pandas.columns)))
))

# first row
df_numpy[0, cols_dict['Second Column']] = 9

В numpy для массивов есть те же арифметические функции, что и в pandas. Часто даже не нужно переводить датафрейм в ndarray, а применять функции именно к нему. К слову, иногда в pandas приходится использовать функции numpy (см. логарифмы выше). Если нужная функция не ищется, можно использовать np.vectorize(), которая сама попробует векторизовать функцию наподобие .apply().

Разберем 2 примера, когда функции numpy отрабатывают быстрее чем apply в pandas.

Во-первых, аналог функции coalesce из SQL. Помимо уже знакомого нам метода .apply() у pandas есть своя векторизованная функция combine_first(), которую мы сравним с .where() из numpy.

# использум датафрейм из кода выше
df = pd.DataFrame()
df['operation'] = df_pandas['operation'].tolist() * 1_000_000
df['First Column'] = df_pandas['First Column'].tolist() * 1_000_000
df['Second Column'] = df_pandas['Second Column'].tolist() * 1_000_000

# -------------------------------------------------
# pandas .apply()
def my_func(row):
if row['operation'] is not None:
return row['operation']
elif row['First Column'] is not None:
return row['First Column']
else:
return row['Second Column']

df['coalesce'] = df.apply(my_func, axis=1)
# 40.9 s
# -------------------------------------------------

# -------------------------------------------------
# pandas .combine_first()
df['coalesce'] = (
df['operation']
.combine_first(df['First Column'])
.combine_first(df['Second Column'])
)
# 411 ms
# -------------------------------------------------


# -------------------------------------------------
# numpy .where()
df['coalesce'] = np.where(
df['operation'].values == None,
df['First Column'].values,
df['operation'].values
)
# 148 ms
# -------------------------------------------------

Как видим, .where() в разы быстрее, а вариант с apply не стоит даже рассматривать на таком объеме данных. Если нужно использовать множество условий, есть функция np.select().

Второй пример — симуляция А/А-тестов. Возьмем средних размеров датафрейм и будем много раз разбивать его на 2 части, сравнивать их между собой и записывать p-value в массив. Методов сплитования достаточно как в numpy, так и в pandas, а также в sklearn. Более того, в numpy есть как устаревшие методы через RandomState, так и аналогичные новые методы через Generator. Ниже симуляции:

import numpy as np
import pandas as pd
import scipy.stats as stats
from sklearn.model_selection import train_test_split
from tqdm.notebook import tqdm

df_pandas = pd.DataFrame(
data=zip(
np.random.normal(loc=0, scale=1, size=1_000_000),
np.random.normal(loc=1, scale=1, size=1_000_000)
),
columns = ['metric1', 'metric2']
)

# -------------------------------------------------
pvalues = []
for i in tqdm(range(1000)):
df_a = df_pandas.sample(frac = 0.5, random_state=i)['metric1']
df_b = df_pandas.drop(df_a.index)['metric1']
_, p = stats.ttest_ind(df_a, df_b)
pvalues.append(p)
# 3min 19s
# -------------------------------------------------


# -------------------------------------------------
pvalues = []
df_arr = df_pandas.to_numpy()[:,0]
for i in tqdm(range(1000)):
x = np.random.uniform(size=df_arr.shape[0]) < 0.5
sample1 = df_arr[x]
sample2 = df_arr[~x]
_, p = stats.ttest_ind(sample1, sample2)
pvalues.append(p)
# 33.9 s
# -------------------------------------------------


# -------------------------------------------------
pvalues = []
df_arr = df_pandas.to_numpy()[:,0]
for i in tqdm(range(1000)):
x = np.random.permutation(df_arr)
sample1 = x[50000:]
sample2 = x[:50000]
_, p = stats.ttest_ind(sample1, sample2)
pvalues.append(p)
# 31.5 s
# -------------------------------------------------


# -------------------------------------------------
pvalues = []
df_arr = df_pandas.to_numpy()[:,0]
rng = np.random.default_rng()
for i in tqdm(range(1000)):
x = rng.permutation(df_arr)
sample1 = x[50000:]
sample2 = x[:50000]
_, p = stats.ttest_ind(sample1, sample2)
pvalues.append(p)
# 26.9 s
# -------------------------------------------------


# -------------------------------------------------
pvalues = []
df_arr = df_pandas.to_numpy()[:,0]
for i in tqdm(range(1000)):
s1, s2 = train_test_split(df_arr, test_size=0.5)
_, p = stats.ttest_ind(s1, s2)
pvalues.append(p)
# 45.1 s
# -------------------------------------------------


# -------------------------------------------------
pvalues = []
df_arr = df_pandas.to_numpy()[:,0]
for i in tqdm(range(1000)):
s1 = np.random.choice(range(df_arr.shape[0]), int(df_arr.shape[0] / 2), replace=False)
s2 = list(set(range(df_arr.shape[0])) - set(s1))
sample1 = df_arr[s1]
sample2 = df_arr[s2]
_, p = stats.ttest_ind(sample1, sample2)
pvalues.append(p)
# 13min 41s
# -------------------------------------------------


# -------------------------------------------------
pvalues = []
df_arr = df_pandas.to_numpy()[:,0]
rng = np.random.default_rng()
for i in tqdm(range(1000)):
s1 = rng.choice(range(df_arr.shape[0]), int(df_arr.shape[0] / 2), replace=False)
s2 = list(set(range(df_arr.shape[0])) - set(s1))
sample1 = df_arr[s1]
sample2 = df_arr[s2]
_, p = stats.ttest_ind(sample1, sample2)
pvalues.append(p)
# 13min 19s
# -------------------------------------------------

Ожидаемо numpy и sklearn быстрее pandas. Лучшие результаты у permutation (причем новая рандомизация быстрее старой), а choice выдал неадекватно плохой результат. Если нужно не просто разбить датафрейм на два, а делать выборки бутстрапом, можно использовать устар. функцию randint или новый генератор в numpy вместо sample в pandas. Также, в numpy есть многопоточная генерация случайных чисел.

Однако, не для всех операций нужно выбирать numpy. Чтение CSV в pandas гораздо быстрее, чем в numpy или стандартном python. Методы genfromtxt из numpy стоит использовать только если не хватает гибкости pd.read_csv(). Ссылка на пример в самом конце.

JIT-компиляторы

Еще один способ сделать применение функций через apply быстрее — скомплилировать их. Мануал pandas дает 3 метода для этого. Первым идет Cython. Простой способ: импортировать ipython magic %%cython в начале ячейки с объявлениями функций и дальше комплятор сделает все сам. Более сложные оптимизации могут дать значительный эффект, но требуют знания языка C.

Второй метод — eval(). Он также работает автоматически. Нужно обернуть в этот метод математические вычисления в pandas. Для других операций это метод не работает.

Третий способ — пакет Numba. При объявлении функции необходимо указать декоратор @jit, также можно указать параметры для параллельной обработки (так например можно генерить случайные числа в numpy). Но о параллелизации и сторонних пакетах мы подробно поговорим в следующем блоке.

2. Параллелизация

Большинство современных компьютеров имеют процессор с более чем одним ядром, т.е. с возможностью делать несколько вычислений одновременно. Более того, часто аналитики и DS работают на сервере (например, c JupyterHub), у которого число ядер может достигать сотен.

Изначально интерпретатор python — однопоточная программа с глобальным локом во время вычисления, а значит в ванильном “pandas” нельзя параллелить задачи. Однако, в стандартной библиотеке python есть несколько модулей, позволяющих работать с многопоточностью, и далее мы разберем их применение.

Первый модуль — multiprocessing. Он обходит глобальный лок, работая не через под-потоки, а через под-процессы. Основной его параметр — это кол-во процессов, на которые будет биться основной. Обычно его выбирают равным кол-ву ядер в процессоре, но я советую брать число на 1 меньше, оставляя одно ядро на накладные расходы и синхронизацию. Далее создается пул воркеров (процессов), на которые будут параллелиться вычисления. Есть несколько вариантов, как разбить по ним датафрейм, но обычно используется .array_split():

import pandas as pd
import multiprocessing as mp


def your_datarame_func(df):
...

n_cores = max(mp.cpu_count() - 1, 1)
p = mp.Pool(n_cores) # Data parallelism Object

def parallelize_dataframe(df, func, n_cores):
df_split = np.array_split(df, n_cores)
df = pd.concat( pool.map(func, df_split) )
pool.close() ; pool.join()
return df

df_results = parallelize_dataframe(df, func=your_datarame_func)

Более высокоуровневый интерфейс предоставляет другой пакет из стандартной библиотеки — concurrent.futures, однако возможностей у него меньше. Он предоставляет 2 аналогичных API для работы с процессами и тредами — ProcessPoolExecutor и ThreadPoolExecutor.

import psutil
import pandas as pd
import numpy as np
from concurrent.futures import ProcessPoolExecutor, as_completed

def your_datarame_func(df):
...

num_procs = max(psutil.cpu_count(logical=True) - 1, 1)

splitted_df = np.array_split(df, num_procs)
df_results = []

with ProcessPoolExecutor(max_workers=num_procs) as executor:
results = [executor.submit(your_datarame_func, df=df) for df in splitted_df]
for result in as_completed(results):
try:
df_results.append(result.result())
except Exception as ex:
print(str(ex))
pass

df_results = pd.concat(df_results)

В concurrent.futures можно получить pid порождаемых процессов (пример). Также, можно использовать сторонние пакеты для параллельных вычислений.

Сторонние пакеты

Первый пакет, который мы рассмотрим — Modin, поскольку он относительно зрелый (первый релиз в середине 2018). На сайте сообщается, что скорость растет почти линейно с увеличение кол-ва ядер. Modin имеет минимальный порог вхождения т.к. почти не требует изменять код. Вместо import pandas as pd достаточно прописать import modin.pandas as pd. Около 80% функций Pandas API поддерживается modin. При этом, надо выбрать какой движок будет обеспечивать параллельную работу — Dask или Ray (при установке пишем !pip install modin[dask] или !pip install modin[ray]). Однако, на бенчмарках Modin не всегда быстрее “ванильного” pandas, и на операциях вроде удаления дубликатов работает медленнее. Думаю что это происходит из-за операций, аналогичных shuffle в hadoop.

Существует вариант для тех у кого хорошая видеокарта. Пакет cudf работает поверх CUDA, и аналогично Modin достаточно изменить только импорт import cudf as pd, но также не все функции pandas поддерживаются. Этот пакет работает только с одним GPU, для нескольких — пакет dask-cudf. Однако, не стоит слишком надеяться на cuDF, поскольку он выигрывает у modin в только в операциях group by, а в других медленнее даже “ванильного” pandas.

Еще один пакет требующий лишь небольшого изменения кода — Swifter. Он ускоряет apply, используя по своему усмотрению или векторизацию, или распараллеливание через Dask, или чистый pandas. Кстати, он работает как с классическим pandas, так и с датафреймами Modin. Его использование достаточно простое:

# обычный apply
df['new_col'] = df.apply(lambda x: random_func(x['col1'], x['col2']))

# apply с помощью swifter
import swifter
df['new_col'] = df.swifter.apply(lambda x: random_func(x['col1'], x['col2']))

Еще один пакет, заточенный именно под .apply() — Pandarallel. В отличии от Swifter, он использует голый multiprocessing вместо Dask и не пытается в векторизацию, поэтому в основном проигрывает по скорости. Пакет очень простой, достаточно простого примера из документации:

from pandarallel import pandarallel
pandarallel.initialize(progress_bar=True)

# обычный apply
df.apply(func)

# apply с помощью pandarallel
df.parallel_apply(func)

Следующий пакет — Polars, изначально написанный для Rust но имеющий python-обертку. В отличии от Modin, его использование потребует изменение кода, а сам синтаксис скорее напоминает пакет dplyr для языка R. Пакет предоставляет два API: Eager API и Lazy API (читай мануал). Eager API работает по похожему с Pandas принципу (результаты сразу после запуска). Lazy API похож на Spark, где сначала происходит планирование пайплайна, а выполнение только после операций типа collect (вывод на экран и тд). При этом используются все доступные ядра. Основные операции, на которых будет заметное увеличение скорости — группировка и джойны. К сожалению, этот пакет также не имеет аналогов всех фукнций в pandas, но всегда вернуться к нему с помощью функции .to_pandas().

Другой пример R-like кода для ускорения — порт пакета data.table на python, datatable. Он дает хороший прирост по производительности, но работает не поверх, а вместо pandas, и требует достаточного времени на изучение.

Если данных так много что одна машина точно не справится, можно параллелить на несколько. Во-первых, можно использовать уже упомянутый Dask. Во-вторых, есть Pandas API для Spark (пример).

import pyspark.pandas as ps

Однако, работа с такими массивами данных все же не задача для Pandas. Я советую использовать вышеупомянутые пакеты для данных, которые уже слишком большие для ванильного pandas, но еще слишком маленькие для Spark’а. Для работы с big-data стеком стоит выучить стандартные для него инструменты, а не мучить pandas костылями.

В интернете и на medium в частности достаточно громких цифр про то, во сколько раз данные методы ускоряют pandas. Но надо понимать, что это зависит от твоих данных и операций, которые будут применяться, поэтому точного ответа нет. Могу рекомендовать широкое исследование от h2o.ai, по результатам которого выиграл Polars. Но для каждой задачи ты должен сам выбрать, какой метод применять, а данная статья дает лишь старт для этого. На машине с одним процессором (например google colab) все выглядит иначе, вот мой пример применения чтения CSV и сплитования для А/В с помощью хэш-функции.

Итог

Главное, что нужно понимать — не стоит применять оптимизацию там, где она не нужна, особенно если выигрыш по времени сильно меньше лага на изучение нового инструмента. Pandas довольно удобен, и на небольших данных я рекомендую использовать именно его. Вызов любого стороннего инструмента поверх pandas имеет некий оверхэд по производительности. В случае Modin, у которого свой класс dataframe, этот оверхэд просто огромен.

Выводы статьи, которые стоит вынести для себя:

  1. Premature optimization is the root of all evil © Дональд Кнут.
  2. Всегда следи за типами данных и с каким форматом файлов работаешь.
  3. Для простой оптимизации начни с использования инструментов, доступных по умолчанию (numpy, multiprocessing). Затем пробуй простые надстройки над pandas (swifter, modin) и только если будешь неудовлетворен используй более сложные методы (dask, polars).

P.S.: Если тебе понравилась стать, в качестве благодарности можешь купить мне кофе на https://www.donationalerts.com/r/stats_data_ninja или https://buymeacoffee.com/koch.

--

--

Stats&Data ninja
Stats&Data ninja

Responses (1)