본문 바로가기
  • ANALOG CODE
  • AnalogCode
개발

InfluxDB 쿼리 개선

by 아날로그코더 2023. 3. 13.
반응형

InfluxDB

Time Series 데이터의 저장 및 검색에 최적화 되어있는 데이터베이스이다.

많은 수의 거래건과 관련된 데이터를 저장하여 시간 기준으로 분석하기 위해 사용하였다.

InfluxDB JOIN 쿼리

InfluxDB에 거래건에 대한 데이터를 저장하고 정해진 시간동안의 거래량과 가격차이에 대한 데이터를 가져오는 쿼리를 작성하였다.

아래와 같은 JOIN이 필요하다.

A JOIN B   ==>  X
C JOIN D  ==>   Y
X JOIN Y  ==>   최종 결과

 

1.  느린 쿼리

처음에는 아래와 같이 쿼리를 만들었다.

import "join"
import "date"

// 240분전부터 현재까지의 데이터 범위를 지정
// 문제가 되었던 부분: dataset 변수
dataset = from(bucket:"market") |> range(start: -240m) |> filter(fn: (r) => r["_measurement"] == "tick")


// 시작 누적거래량
volFirst = dataset |> filter(fn: (r) => r["_field"] == "acc_price") |> first()

// 마지막 누적거래량
volLast = dataset |> filter(fn: (r) => r["_field"] == "acc_price") |> last()

// 거래량
vol = join.inner(
  left: volFirst,
  right: volLast,
  on: (l, r) => l.product == r.product,
  as: (l, r) => ({
    l with product: l.product,
    volume: if r._value > l._value then r._value - l._value else r._value
  })
) |> group()


price = dataset |> filter(fn: (r) => r["_field"] == "price")

// 시작 가격
first = price |> first()

// 마지막 가격
last = price |> last()

// 가격 변화: 시작 가격과 마지막 가격의 차이
diff = join.inner(
    left: first,
    right: last,
    on: (l, r) => l.product == r.product,
    as: (l, r) => ({
        l with product: r.product, diff: r._value - l._value
    })
) |> group()


// 거래량의 가격 변화를 상품별로 조인
// 결과 컬럼: [상품, 거래량, 가격 변화]
join.inner(
    left: vol,
    right: diff,
    on: (l, r) => l.product == r.product, 
    as: (l, r) => ({
        product: l.product,
        volume: l._value,
        diff: r.diff
    })
)

 

변수 저장

결과를 얻기 위한 타켓 데이터를 독립적인 변수로 지정해 놓았다.

 

dataset = from(bucket:"market") |> range(start: -240m) |> filter(fn: (r) => r["_measurement"] == "tick")

 

쿼리 시간

타켓 데이터의 시간의 범위를 바꾸면서 쿼리시간을 측정하였다.

range(start: -240m)

시간의 범위가 늘어나는 비율로 쿼리의 속도도 느려졌다.

5분 0.3초
30분 0.9초
60분 1.5초
90분 2.3초
120분 4초
240분 7.4초

이상하다??

시간의 범위가 적던 크던 필요한 데이터는 처음값과 마지막 값의 차이로 결정이 되는 거라 상관이 없을줄 알았는데,

쿼리 시간이 같이 증가한다. 

이상한게 마지막 JOIN 쿼리 전까지의 결과만 보면 모두 빠르게 된다. 마지막 조인쿼리에서 대부분의 시간을 차지한다.

뭐가 문제인지 도저히 모르겠어서,

혹시나 하고 dataset 변수를 없애보았다.

 

 

2. 빠른 쿼리

타켓 데이터를 dataset이라는 변수로 정의해 놓고 사용하던걸 없앴다.

미세한 시간차이가 나지 않도록 미리 시간만 변수에 저장해 놓았다.

import "join"
import "date"

now = now()
startTime = date.sub(d: 240m, from: now)

// 시작 누적거래량
volFirst = from(bucket:"market") |> range(start: startTime, stop:now) |> filter(fn: (r) => r["_measurement"] == "tick") |> filter(fn: (r) => r["_field"] == "acc_price") |> first()
// 마지막 누적거래량
volLast = from(bucket:"market") |> range(start: startTime, stop:now) |> filter(fn: (r) => r["_measurement"] == "tick") |> filter(fn: (r) => r["_field"] == "acc_price") |> last()

// 거래량
vol = join.inner(
  left: volFirst,
  right: volLast,
  on: (l, r) => l.product == r.product,
  as: (l, r) => ({
    l with product: l.product,
    volume: if r._value > l._value then r._value - l._value else r._value
  })
) |> group()

// 시작 가격
first = from(bucket:"market") |> range(start: startTime, stop:now) |> filter(fn: (r) => r["_measurement"] == "tick") |> filter(fn: (r) => r["_field"] == "price") |> first()

// 마지막 가격
last = from(bucket:"market") |> range(start: startTime, stop:now) |> filter(fn: (r) => r["_measurement"] == "tick") |> filter(fn: (r) => r["_field"] == "price") |> last()

// 가격 변화: 시작 가격과 마지막 가격의 차이
diff = join.inner(
    left: first,
    right: last,
    on: (l, r) => l.product == r.product,
    as: (l, r) => ({
        l with product: r.product, diff: r._value - l._value
    })
) |> group()

// 거래량의 가격 변화를 상품별로 조인
// 결과 컬럼: [상품, 거래량, 가격 변화]
join.inner(
    left: vol,
    right: diff,
    on: (l, r) => l.product == r.product, 
    as: (l, r) => ({
        product: l.product,
        volume: l.volume,
        diff: r.diff
    })
)

 

 

속도 측정

5분 0.2초
30분 0.2초
60분 0.2초
90분 0.2초
120분 0.2초
240분 0.2초

시간의 범위가 늘어나도 쿼리 시간은 똑같다. 이게 내가 원한 결과였다.

 

위의 쿼리에 필요한 연산에는 복잡한 과정이 없다.

정해진 시간의 범위내에서 시작값과 마지막 값만 보면 되는 쿼리이다.

그런데도 데이터셋을 변수에 저장해놓고 이걸 2번의 JOIN 과정을 거치면서 사용하면 범위가 커질수록 느려진다.

쿼리의 최적화에 문제가 있는듯 하다.

 

변수에 저장하지 않고 모든 데이터셋을 하나하나 지정해서 쿼리를 만드니까 이러한 문제가 사라졌다.

 

InfluxDB 문서를 찾아봤지만, 관련해서 원인을 찾기가 쉽지가 않았다.

문서를 다시 한번 꼼꼼히 살펴봐야겠다.

 

반응형

댓글