Detección de cambios direccionales en Python

Share this

El algoritmo

Esta semana mi mujer compartió de forma humorística un tweet en el que decía que de buena mañana su marido le hablaba de cambios direccionales, series temporales y fractales. Nos pudimos reír con el tema y fue la comidilla entre algunos amigos unos días. El caso es que estuve esta semana pasada programando algunos algoritmos sobre el tema. Para entender la envergadura del problema, valga una breve lista de algunos estudios sobre esta materia pues no es un asunto trivial.

  • Detección y análisis de CD (cambio direccional) de saltos en datos intra-día. Hvozdyk, Lyudmyla (PhD).
  • Identificación de CD basado en eventos. Alkhamees, Nora. IADS.
  • Negociación de Hombro-Cabeza-Hombro basado en CD. Li, Shengnan. CCFEA.
  • Hechos estilizados (regularidades empíricas) en CD, negociación algorítmica. Golub, Anton.
  • Negociación algorítmica. Kampouridis, Michael (PhD). Univ. de Kent.
  • Análisis de rebasamiento. Sun, Jianyong (PhD). Universidad de Essex.
  • Caracterización de CD, datos de alta frecuenca. Serguieva, Antoaneta (PhD). Computación financiera y Analítica, UCL.
  • Caracterización del CD. Gao, Jing. Universidad de Beihang.
  • Carecterización de CD: glosario y comparación. Tao, Ran (PhD). CCFEA.
  • Negociación algorítmica (proyecto de doctorado). Ye, Alan. Universidad de Greenwhich.

Y claro, uno se pregunta qué hace tanto doctor y doctorando escribiendo sobre el asunto. Y al final se tira una semana estudiando el problema y buscando la solución más sencilla. Es lo que tiene dedicarse a la ingeniería del software: cuando más fácil y más rápido, mejor.

Así que he escrito una pequeña variación del algoritmo de Bill Williams para calcular fractales. Sus aplicaciones en trading son innumerables. Desde cálculo de canales, soportes y resistencias, trading armónico, categorización de sesiones del mercado bursátil, determinación de momento y tendencia del valor de instrumentos financieros, análisis de patrones técnicos y todo lo que a quien lo explote se le ocurra.

import numpy as np
import pandas as pd
 
class Fractals(object):
    __data = None
    __peaks = None
    __valleys = None
    __column_mode = None
 
    def __init__(self):
        pass
 
    def getFractals(self, data, column_mode="high", depth=3):
        Bars = data.shape[0]
        peaksBuffer = np.empty(Bars)
        valleysBuffer = np.empty(Bars)
        peaksBuffer [ : ] = np.NaN
        valleysBuffer [ : ] = np.NaN
 
        i = depth
 
        while (i < Bars - depth - 1):
 
            is_upper_fractal = False
            is_lower_fractal = False
 
            lower_range_pos = i - depth
            upper_range_pos = i + depth + 1
            N = lower_range_pos + depth
 
            lower_range_values = data.iloc[lower_range_pos:N][column_mode].values
            upper_range_values = data.iloc[N + 1:upper_range_pos][column_mode].values
            N_value = data.iloc[N][column_mode]
 
            # Basic Fractal:
            # Peaks
            if np.append([N_value], lower_range_values).argmax() == 0 and\
                    np.append([N_value],upper_range_values).argmax() == 0:
 
                if N_value not in lower_range_values:
                    is_upper_fractal = True
                    peaksBuffer[N] = N_value
 
            # Valleys
            if not is_upper_fractal:
                if np.append([N_value], lower_range_values).argmin() == 0 and \
                        np.append([N_value], upper_range_values).argmin() == 0:
 
                    if N_value not in lower_range_values:
                        is_lower_fractal = True
                        valleysBuffer[N] = N_value
            i += 1
 
            peaksBufferSeries = pd.Series(peaksBuffer, name="peaks", index=data.index).dropna()
            valleysBufferSeries = pd.Series(valleysBuffer, name="valleys", index=data.index).dropna()
 
            self.__data = data
            self.__peaks = peaksBufferSeries
            self.__valleys = valleysBufferSeries
            self.__column_mode = column_mode
 
        return pd.merge(
            peaksBufferSeries, valleysBufferSeries, left_index=True, right_index=True, how="outer", sort=False
        )

¿Cómo se utiliza esto? Hace falta pasarle un DataFrame de pandas cuyo índice sea de tipo datetime y que esté ordenado ascendente.

Los datos

Supongamos que tenemos el siguiente DataTrame:

date open high low close
2003-05-28 08:40:00 1.1782 1.17968 1.17619 1.1795
2003-05-28 08:45:00 1.17967 1.18012 1.17967 1.18004
2003-05-28 08:50:00 1.17996 1.18007 1.17939 1.17939
2003-05-28 08:55:00 1.17932 1.17944 1.17691 1.17695
2003-05-28 09:00:00 1.17702 1.17796 1.17702 1.1779
2003-05-28 09:05:00 1.17795 1.17823 1.17759 1.17759
2003-05-28 09:10:00 1.17768 1.17805 1.17756 1.17802
2003-05-28 09:15:00 1.17791 1.17802 1.1778 1.17787
2003-05-28 09:20:00 1.17796 1.17923 1.17773 1.17923
2003-05-28 09:25:00 1.17933 1.17935 1.17786 1.17789
2003-05-28 09:30:00 1.17793 1.17832 1.17762 1.17793
2003-05-28 09:35:00 1.17794 1.17839 1.17789 1.17802
2003-05-28 09:40:00 1.17808 1.17837 1.177 1.17703
2003-05-28 09:45:00 1.17673 1.17827 1.17666 1.17827
2003-05-28 09:50:00 1.17819 1.1783 1.17779 1.17788
2003-05-28 09:55:00 1.17784 1.17872 1.17784 1.17872
2003-05-28 10:00:00 1.17847 1.17867 1.17833 1.17844
2003-05-28 10:05:00 1.17863 1.17886 1.17852 1.17862
2003-05-28 10:10:00 1.17865 1.17872 1.17771 1.17772
2003-05-28 10:15:00 1.1775 1.1787 1.17744 1.17839
2003-05-28 10:20:00 1.1784 1.17883 1.17837 1.17859
2003-05-28 10:25:00 1.17859 1.17869 1.17842 1.17855
2003-05-28 10:30:00 1.17859 1.1787 1.17839 1.17851
2003-05-28 10:35:00 1.17845 1.17874 1.17831 1.17867
2003-05-28 10:40:00 1.17866 1.17977 1.17818 1.17861
2003-05-28 10:45:00 1.17852 1.17866 1.17822 1.17856
2003-05-28 10:50:00 1.1787 1.17893 1.1786 1.1786
2003-05-28 10:55:00 1.17855 1.17855 1.17786 1.17786
2003-05-28 11:00:00 1.17769 1.17783 1.17426 1.17429
2003-05-28 11:05:00 1.17429 1.17474 1.17411 1.17456
2003-05-28 11:10:00 1.17467 1.17516 1.17462 1.17484
2003-05-28 11:15:00 1.17472 1.17517 1.17217 1.17517
2003-05-28 11:20:00 1.17518 1.17537 1.17493 1.17493
2003-05-28 11:25:00 1.17493 1.17532 1.17469 1.17511
2003-05-28 11:30:00 1.17504 1.17504 1.17436 1.17469
2003-05-28 11:35:00 1.17471 1.17563 1.17471 1.17558
2003-05-28 11:40:00 1.17559 1.17793 1.17528 1.17529
2003-05-28 11:45:00 1.17519 1.17559 1.17506 1.17547
2003-05-28 11:50:00 1.17558 1.17572 1.17533 1.17533
2003-05-28 11:55:00 1.17536 1.17536 1.17307 1.17307
2003-05-28 12:00:00 1.17268 1.17268 1.17166 1.17167
2003-05-28 12:05:00 1.17168 1.17237 1.17147 1.17224
2003-05-28 12:10:00 1.17214 1.17228 1.17194 1.17217
2003-05-28 12:15:00 1.17224 1.17237 1.17187 1.17209
2003-05-28 12:20:00 1.17205 1.17229 1.17153 1.17153
2003-05-28 12:25:00 1.17121 1.17226 1.17047 1.17204
2003-05-28 12:30:00 1.17208 1.1726 1.17181 1.17259
2003-05-28 12:35:00 1.17261 1.17264 1.17182 1.17192
2003-05-28 12:40:00 1.17203 1.17337 1.17189 1.17337
2003-05-28 12:45:00 1.17351 1.1736 1.1714 1.17153

Si el campo «date» es una columna y el dataframe se llama «data», por ejemplo, tendríamos que escribir lo siguiente para poder usarlo como argumento, pues el  índice ha de ser datetime:

data = data.set_index("date")

Por supuesto el código de la clase anterior se puede modificar para funcionar con numpy o cualquier otra biblioteca.

La aplicación

Si queremos obtener los máximos y mínimos locales de los valores «high» de nuestra serie de datos utilizando 6 valores para calcular los picos y valles, es tan simple como hacer esto:

#"data" es un DataFrame que contiene los datos de la tabla de arriba.
fr = fractals()
high_fractals = fr.getFractals(data=data,column_mode="high", depth=3)

El resultado se cargaría en la variable high_fractals y sería el siguiente:

Y para extraer los máximos, por ejemplo, solo bastaría ejecutar el siguiente código:

Con una profundidad de 2, éste sería el resultado para los datos:

import matplotlib.pyplot as plt
 
fr = fractals() 
#"data" es un DataFrame que contiene los datos de la tabla de arriba. fr = fractals() 
high_fractals = fr.getFractals(data=data,column_mode="high", depth=2)
 
fig, ax = plt.subplots()
fig.set_size_inches(10, 5)
plt.plot(high_fractals["valleys"].dropna(), marker='^', linestyle="none", markersize=7,  color="red")
plt.plot(high_fractals["peaks"].dropna(), marker='v', linestyle="none", markersize=7,  color="green")
plt.plot(data.high, color="black")
plt.show()

El resultado es más que satisfactorio para la mayoría de escenarios.  En mi caso, lo voy a mejorar añadiendo también identificación de movimientos a partir de cierto nivel (porcentaje, pips, etc). Pero, en general, este sencillo algoritmo cumple con todas las expectativas que personalmente necesito para marcar cambios direccionales en series temporales. Menos es más.

Si en vez de mostrar la serie sobre la que hemos obtenido los picos y valles, empleamos toda la serie de precios, el resultado será mucho más obvio. Para el siguiente ejemplo, vamos a utilizar un total de 200 puntos, en vez de los 50 de antes.

#cargamos un dataframe con 200 datos.
data = all_historical_data.dropna()[5000:5200]
 
fr = fractals()
 
high_fractals = fr.getFractals(data=data,column_mode="high", depth=3)
high_fractals["fractal_type"]="highs"
high_fractals=high_fractals.reset_index().set_index(["fractal_type","date"])
 
low_fractals = fr.getFractals(data=data,column_mode="low", depth=3)
low_fractals["fractal_type"]="lows"
low_fractals=low_fractals.reset_index().set_index(["fractal_type","date"])
 
# Vamos a incluir una serie de librerías que nos interesan.
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
import matplotlib.dates as mdates
from mpl_finance import candlestick_ohlc
from datetime import date
import datetime
 
fig, ax1 = plt.subplots(figsize=(20,5))
ax1.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M:%S"))
 
#Creamos un conjunto de velas japonesas.
number_of_ticks = data.shape[0]
dates = data.index
x=0
ohlc=[]
while x < len(dates):
    d = mdates.date2num(dates[x])
    append_me = d, data["open"].values[x], data["high"].values[x], data["low"].values[x], data["close"].values[x]
    ohlc.append(append_me)
    x += 1
 
# Añadimos bordes a las velas.
lines, patches = candlestick_ohlc(ax1, ohlc, width=0.2*2.5/number_of_ticks, colorup="green", colordown="red")
for line, patch in zip(lines, patches):
    patch.set_edgecolor("black")
    patch.set_linewidth(0.9)
    patch.set_antialiased(False)
    line.set_color("black")
    line.set_zorder(0)
 
#Seleccionamos los máximos de los Highs (Higher Highs) y los mínimos de los lows (Lower Lows)
hfs = high_fractals.loc["highs"].drop("valleys",axis=1).dropna()
lfs = low_fractals.loc["lows"].drop("peaks",axis=1).dropna()
 
plt.plot(hfs, marker="o", linestyle="None")
plt.plot(lfs, marker="o", linestyle="None")
plt.savefig("fractals.png")
plt.show()

Y el resultado es la gráfica siguiente:

¿Y qué se puede hacer con estos puntos? Con dos puntos podemos pintar una línea, por ejemplo.

data = high_fractals["peaks"].dropna()
TF_seconds = 300 #timeframe de 5 minutos.
peaks = []
 
for i in range(1, data.shape[0]):
    peaks.append( { "type": "H", 
    "x1": data[i - 1:i].index, "y1": data[i - 1], 
    "x2": data[i:i + 1].index, "y2": data[i], 
    )
 
for point in peaks:
    diference = (point["x2"] - point['x1']).seconds[0]
    units = diference / TF_seconds
    slope = (point["y2"] - point["y1"]) / units
    slope = slope / 0.0001
    peaks[i]["slope"] = slope
    peaks[i]["units"] = units
    i += 1

Y he ahí todos los vectores directores de todas las líneas de todos los picos consecutivos. Con meta-información relativa a su pendiente e incremento de X en formato escalar (aparte de datetime). La imagen de abajo ilustra la estructura de la implementación completa, donde la pendiente está calculada en pips:

La imaginación es el límite. Podríamos utilizar estos datos para analizar varias características de nuestros vectores directores. Veamos un ejemplo con la implementación completa. Para ello vamos a unir en un solo dataframe todos los fractales y todos los vectores directores en segundo lugar:

all_analyzed_data=pd.concat([high_fractals,low_fractals])

Lo que nos generaría una estructura similar a ésta:

Con los datos de los fractales, vamos a generar los vectores directores entre cada punto y todos los puntos posteriores al mismo.

Digits=4
# Low peaks -> the lower high points
# Low valleys -> the lower low points
# High peaks -> the Higher high points
# High valleys -> THe Higher high points
 
tl_higher_highs = Trendlines(dataframe=all_analyzed_data.loc["highs"],column_mode="peaks",seconds=300, digits=Digits)
tl_higher_lows = Trendlines(dataframe=all_analyzed_data.loc["highs"],column_mode="valleys",seconds=300, digits=Digits)
 
tl_lower_lows = Trendlines(dataframe=all_analyzed_data.loc["lows"],column_mode="peaks",seconds=300, digits=Digits)
tl_lower_highs = Trendlines(dataframe=all_analyzed_data.loc["lows"],column_mode="valleys",seconds=300, digits=Digits)

Cada una de las cuatro variables, tendría una estructura similar a ésta:

Cada fila de la tabla anterior representa un vector director que une dos puntos de los identificados con el algoritmo de fractales del principio. Como estoy analizando los Higher Highs, me voy a centrar en los picos. Los datos del dataframe incluyen:

  • Tipo de vector: valle o pico.
  • x1: fecha del primer punto.
  • y1: precio del primer punto.
  • x2, y2: fecha y precio del segundo punto.
  • height: Altura entre y1 e y2.
  • pips: Altura entre y1 e y2 en pips.
  • slope: pendiente del vector en pips por unidades temporales (pips/5m en este caso).
  • width: Distancia horizontal en número de unidades temporales entre el punto 1 y el punto 2 (cada unidad temporal son 5 minutos en este caso).

Quitamos información innecesaria y preparamos el dataset para su análisis.

data_hhs = tl_higher_highs.getFractalsVectors()
data_hhs_peaks=data_hhs.reset_index().drop(["x1","x2","index_type"],axis=1)

Con lo que ahora tenemos el siguiente DataFrame:

Veamos algunas características. Como la distribución de número de pips de cada vector director.

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline
from scipy import stats
 
sns.distplot(data_hhs_peaks["pips"].values)
plt.show()

Comparar la altura en pips de los puntos con la pendiente de la recta que los une.

df = data_hhs_peaks[["pips","slope"]]
#df = df[df.slope.apply(abs)<=1.5]
df.columns = ["x","y"]
 
g = sns.jointplot(x="x", y="y", data=df, kind="kde", color="brown")
g.plot_joint(plt.scatter, c="b", s=30, linewidth=1, marker="+")
g.ax_joint.collections[0].set_alpha(0)
g.set_axis_labels("$Pendiente$", "$Altura$");

Y comprender mejor el entorno sobre el que estamos trabajando. Porque no es sino con la comprensión profunda del problema, con lo que más oportunidades tenemos para encontrar soluciones óptimas.

Deja un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

Este sitio usa Akismet para reducir el spam. Aprende cómo se procesan los datos de tus comentarios.