布林线WIKI:https://zh.wikipedia.org/wiki/%E5%B8%83%E6%9E%97%E5%B8%A6
标准差WIKI:https://zh.wikipedia.org/wiki/%E6%A8%99%E6%BA%96%E5%B7%AE
布林线计算公式:
1.计算MA
MA = 最近N日收盘价之和 / N日
2.计算MD
MD = 平方根 ((最近N日(收盘价 – MA)平方之和)/N)
3.计算MB、UP、DN
MB = 收盘价的N日简单移动平均值
UP = MB + P * MD
DN = MB – P * MD
布林带天数参数默认值:20,P决定布林带的宽度,默认值为2。
下面是一个Storm节点计算BOLL指标的代码片段:
class MarketCandleIndicatorCalcBOLL(Bolt): outputs = ["exchange_symbol", "exchange", "symbol", "update_candle_period_list", "market_cache_json"] def initialize(self, storm_conf, context): pass ######################################################################################## def _calc_boll(self, marketDict, calcPeriod, width): indiStr = "boll_" + str(calcPeriod) + "_" + str(width) maIndiStr = "ma" + str(calcPeriod) for period in marketDict["candle"]: for idx, candleDict in enumerate(marketDict["candle"][period]): if idx < calcPeriod - 1: continue if "ma" not in marketDict["indicator"][period][idx]: continue if maIndiStr not in marketDict["indicator"][period][idx]["ma"]: continue if "boll" not in marketDict["indicator"][period][idx]: marketDict["indicator"][period][idx]["boll"] = dict() if indiStr in marketDict["indicator"][period][idx]["boll"]: continue currMa = marketDict["indicator"][period][idx]["ma"][maIndiStr] closeStdDevSum = 0.0 for beforeIdxDev in xrange(calcPeriod): closeStdDevSum += (marketDict["candle"][period][idx - beforeIdxDev]["close"] - currMa) ** 2 closeMd = math.sqrt(closeStdDevSum / calcPeriod) bollMb = currMa bollUp = bollMb + width * closeMd bollDn = bollMb - width * closeMd if bollUp - bollDn != 0: bollPb = round((candleDict["close"] - bollDn) / (bollUp - bollDn) * 100, 2) else: bollPb = 50.0 if bollMb != 0: bollBw = round((bollUp - bollDn) / bollMb * 100, 2) else: bollBw = 0.0 marketDict["indicator"][period][idx]["boll"][indiStr] = { "up": bollUp, "middle": bollMb, "down": bollDn, "pb": bollPb, "bw": bollBw } ######################################################################################## def process(self, tup): routerField = tup.values[0] exchange = tup.values[1] symbol = tup.values[2] updatePeriodListStr = tup.values[3] marketJson = tup.values[4] try: marketDict = json.loads(marketJson) except Exception, e: self.logger.error("Market cache json loads failed. Exception: " + str(e)) return self._calc_boll(marketDict, 20, 2) try: marketIndicatorJson = json.dumps(marketDict, ensure_ascii=False) except Exception, e: self.logger.error("Dumps json of market BOLL indicator failed. Exception: " + str(e)) marketIndicatorJson = marketJson self.emit([routerField, exchange, symbol, updatePeriodListStr, marketIndicatorJson])