说明

UCS对象是基于GFGoLite进行封装,且侧重于实现UCS规范。

内容

1 函数

我发现pydantic真是一个特别好用的东西,可以确保在数据传递时的可靠,以及对某个数据模型的描述。

以下,UCS给出了id、time相关的brick映射,并给出了时间到字符的转换。其方便之处在于,本地几乎不需要管怎么实现的,只要有服务支持即可。效率也足够高。

from typing import List, Optional
from pydantic import BaseModel

import requests as req 
class UCS(BaseModel):
    __version__ =1.1
    gfgo_lite_server: str = 'http://172.17.0.1:24090/'

    def get_brick_name(self, some_id = None):
        some_dict = {}
        some_dict['rec_id'] = some_id
        url = self.gfgo_lite_server + 'get_brick_name/'
        res = req.post(url, json = some_dict).json()
        return res 

    def get_brick_name_s(self, some_id_list = None):
        some_dict = {}
        some_dict['rec_id_list'] = some_id_list
        url = self.gfgo_lite_server + 'get_brick_name_s/'
        res = req.post(url, json = some_dict).json()
        return res 


    def gfgo(self, kwargs = None, pack_func = None):
        url = self.gfgo_lite_server + 'gfgo/'
        return req.post(url, json = {'kwargs':kwargs,'pack_func':pack_func}).json()


    def dt_str2num(self, some_dt_str = None):
        some_dict = {}
        some_dict['some_dt_str'] = some_dt_str
        url = self.gfgo_lite_server + 'str2num/'
        res = req.post(url, json = some_dict).json()
        return  res

    def dt_str2num_s(self, some_dt_str_list = None):
        some_dict = {}
        some_dict['some_dt_str_list'] = some_dt_str_list
        url = self.gfgo_lite_server + 'str2num_s/'
        res = req.post(url, json = some_dict).json()
        return  res

    # 时间名称
    def get_time_brick_name(self, dt_str_or_ts = None):
        some_dict = {}
        some_dict['dt_str_or_ts'] = dt_str_or_ts
        url = self.gfgo_lite_server + 'get_time_brick_name/'
        res = req.post(url, json = some_dict).json()
        return res 

    def get_time_brick_name_s(self, dt_str_or_ts_list = None):
        some_dict = {}
        some_dict['dt_str_or_ts_list'] = dt_str_or_ts_list
        url = self.gfgo_lite_server + 'get_time_brick_name_s/'
        res = req.post(url, json = some_dict).json()
        return res 

    # 1.1 >>>
    def get_brick_list(self, start_brick_name = None, end_brick_name = None):
        some_dict = {}
        some_dict['start_brick_name'] = start_brick_name
        some_dict['end_brick_name'] = end_brick_name

        url = self.gfgo_lite_server + 'get_brick_list/'
        res = req.post(url, json = some_dict).json()
        return res 

    def get_brick_bounds(self, brick_name = None):
        some_dict = {}
        some_dict['brick_name'] = brick_name

        url = self.gfgo_lite_server + 'get_brick_bounds/'
        res = req.post(url, json = some_dict).json()
        return res 

    # 1.2 >>> 时间
    # get_time_brick_bounds_s - 待
    def get_time_brick_bounds_s(self, brick_name_list = None, char_or_num = 'char'):
        some_dict = {}
        some_dict['brick_name_list'] = brick_name_list
        some_dict['char_or_num'] = char_or_num

        url = self.gfgo_lite_server + 'get_time_brick_bounds_s/'
        res = req.post(url, json = some_dict).json()
        return res 

    # get_time_brick_list
    def get_time_brick_list(self,start_brick_name = None, end_brick_name = None):
        some_dict = {}
        some_dict['start_brick_name'] = start_brick_name
        some_dict['end_brick_name'] = end_brick_name

        url = self.gfgo_lite_server + 'get_time_brick_list/'
        res = req.post(url, json = some_dict).json()
        return res 

2 应用

2.1 时间转换

ucs = UCS(gfgo_lite_server = 'http://xxx:24090/')
ucs.dt_str2num('2024-01-31 11:11:11')
1706670671

ucs.dt_str2num_s(['2024-01-31 11:11:11','2024-02-01 11:11:11' ])
[1706670671, 1706757071]

反函数可以用get_time_str1

In [19]: get_time_str1(1706670671)
Out[19]: '2024-01-31 11:11:11'

2.2 UCS-time

一个简单的应用如下:我需要制定一个静态数据同步计划,计划需要随着时间轴,按照时间块熟顺序向前推进。具体来说,这个对于数据库迁移(合并)与回测都有关系。

tbrick1 = ucs.get_time_brick_name('2024-01-31 11:11:11')
'2024.01.31.11'
tbrick2 = ucs.get_time_brick_name('2024-02-10 11:11:11')
'2024.02.10.11'

tbrick_list = ucs.get_time_brick_list(start_brick_name = tbrick1, end_brick_name = tbrick2)
['2024.01.31.11', '2024.01.31.12', '2024.01.31.13', ...]

ucs.get_time_brick_bounds_s(tbrick_list[:3])
[['2024-01-31 11:00:00', '2024-01-31 12:00:00'],
 ['2024-01-31 12:00:00', '2024-01-31 13:00:00'],
 ['2024-01-31 13:00:00', '2024-01-31 14:00:00']]

btw, 在探索完UCS-time之后,我发现大约因为时间观念比较深入人心,其实是可以更自由的转换的,

# 将一般字符串转为UCS 名称
def dt_str2ucs_blockname(some_dt_str):
    some_dt_str1   =some_dt_str.replace('-','.').replace(' ','.').replace(':','.')
    return '.'.join(some_dt_str1.split('.')[:4])

'''
dt_str2ucs_blockname('2024-06-24 09:30:00')
'2024.06.24.09'
'''

这些函数也被纳入基础函数,有时候可以起到辅助的作用,特别在使用GFGolite不那么方便的时候。

一个更具体的场景,我觉得是具有通用性的,描述如下:

  • 1 数据没有顺序id,但是具备入库时间(create_time)
  • 2 数据入库时间是不连续的,有一些时间段密集,更多的时间可能空闲,但也并不是一条没有。
  • 3 每次执行时,worker总是能够有效推进一个brick

具体做法:

p01: 将需要的时间块按序持久化为本地文件,并将对应的bounds作为字典

p02: 获取当前时间的时间块

# 将标准时间字符串转为brick
def timestr2timeblock(some_time_str = None):
    x1 = some_time_str.replace('-','.').replace(' ','.').replace(':','.')
    return '.'.join(x1.split('.')[:4])

the_time_now = get_time_str1()
the_time_brick = timestr2timeblock(the_time_now)

p03:从gb中获取上一次处理过的最新brick,并锁定在时间轴的位置。


worker_buffer_space = 'sp_worker.general'
tier1 = 'some_tier'
tier2 = 'ucs_time_brick_ordered.sniffer'
prefix = '.'.join([worker_buffer_space,tier1,tier2]) +'.'

# ==========================  Load
time_brick_list = from_pickle('time_brick_list')
time_bounds_dict = from_pickle('time_bounds_dict')
gb = GlobalBuffer()
watch_brick_pos = time_brick_list.index(the_time_brick)

p04:按照已处理之后,当前观察时间之前的原则,选出available_time_brick_list

latest_data_brick = gb.getx(prefix +'last_brick_handled')
if latest_data_brick is None:
    start_pos = 0
else:
    start_pos =  time_brick_list.index(latest_data_brick)

available_time_brick_list = time_brick_list[start_pos+1:watch_brick_pos]

p05:按顺序遍历时间块,直到处理到一个有效的brick才中断本次循环

# cur_data_brick = available_time_brick_list[0]
for cur_data_brick in available_time_brick_list:
    print(cur_data_brick)
    cur_bound = time_bounds_dict[cur_data_brick]
    print(cur_bound)
	resp = 【获取待处理数据】
    if len(resp):
        res_df = pd.DataFrame(resp, columns = keep_cols)
        if ok_to_handle:
        	【将结果分批删除】
            listofdict2 = slice_list_by_batch2(xxx.to_dict(orient='records'), 1000)
            for some_listofdict in listofdict2:
        【更新已处理块信息】
        gb.setx(prefix +'last_brick_handled',cur_data_brick, persist=True)
        break

通过循环或者定时任务唤起这样的worker就可以。

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部