代码内容
# -*- coding: utf-8 -*- #从datetime模块引入3个类:date,timedelta,datetime from datetime import date,timedelta,datetime """ 时间日期的转换和对比,是比较常见的操作 """ today_date=date.today()#获取当前日期:datetime.date类型 print("today_date={}".format(today_date)) today_date_str=today_date.strftime('%Y-%m-%d')#datetime.date类型转换为字符串 print("today_date_str={}".format(today_date_str)) today_datetime=datetime.strptime(today_date.strftime('%Y-%m-%d'), '%Y-%m-%d')#datetime.date类型转换为字符串,再转换为datetime.datetime类型 select_date_str='2018-06-30' select_datetime = datetime.strptime(select_date_str,'%Y-%m-%d')#字符串类型转换为datetime.datetime类型 print(type(select_datetime)) future_diff=(select_datetime-today_datetime).days#判断日期之前相差的天数 isrealtime=False if future_diff >= 0:#选择日期大于或等于今日今天 isrealtime=True pass print("所选日期是否大于或等于今日:{}".format(isrealtime)) oneday = timedelta(days=1)#时间差对象数据 yes_datetime=select_datetime-oneday#日期类型时间操作 yes_datetime_str=yes_datetime.strftime('%Y-%m-%d')#昨日时间,字符串类型 print("yes_datetime_str={0}".format(yes_datetime_str)) #取年、月、日 print("year={},month={},day={}".format(select_datetime.year,select_datetime.month,select_datetime.day)) #判断每月最后一天:原日期加上一天,如果的天数为1,那原日期就是上个月最后一天 print("is lastday:{}".format((select_datetime+oneday).day==1))
#更多时间类型属性和操作参考:https://docs.python.org/2.7/library/datetime.html
"""
常见数据处理操作
列表类型相加,类型格式如:
[{"id":1,"amount":120,"count":2},{"id":2,"amount":100,"count":3}]
[{"id":2,"amount":50,"count":5},{"id":3,"amount":130,"count":6}]
结果:
[{"id":1,"amount":120,"count":2},{"id":2,"amount":150,"count":8},{"id":3,"amount":130,"count":6}]
"""
def list_dict_add(list1,list2,key='id'): """ 两个list<dict>类型相加,两个list数据类型完全一样 指定key字段,除了key字段之外都是数字 """ result_data=None if list1!=None and list2!=None:#实时和离线都有值,对应结果值相加 c_real=len(list1) c_offline=len(list2) if c_real==0 and c_offline == 0: return [] if c_real>=c_offline: l1=list1 l2=list2 pass else: l1=list2 l2=list1 pass c_real=len(l1) c_offline=len(l2) result_data=l1[:]#.copy() ks=result_data[0].keys()#字典的所有key值 ks.remove(key)#去掉用来对比的主键,用来循环复制值 #第一次,循环list1,如果有对应的list2,相加并更新值 for i in range(0,c_real): d_real=result_data[i] for j in range(0,c_offline): d_offline=l2[j]; if d_real[key]==d_offline[key]: for k in ks: v_new=d_real[k]+d_offline[k]#实时+离线 d_real.update({k:v_new})#更新 pass break pass pass pass #第二次,循环离线数据,如果没有对应的结果数据,则增加一项到结果数据 for i in range(0,c_offline): d_offline=l2[i] has_real=False for j in range(0,c_real): d_real=result_data[j] if d_real[key]==d_offline[key]: has_real=True break pass if has_real == False: #c_real不需要变 result_data.append(d_offline.copy()) pass pass pass elif list1!=None and list2==None: result_data=list1[:] pass elif list1==None and list2!=None: result_data=list2[:] pass return result_data pass
"""
常见数据比较操作
列表类型数据比较,格式如:
[{"id":1,"amount":120,"count":2},{"id":2,"amount":100,"count":3}]
[{"id":2,"amount":150,"count":5},{"id":3,"amount":130,"count":6}]
比较之前,要自己排好序,格式完全一样
"""
def cmp_dict(src_data,dst_data,keypath=''): # assert type(src_data) == type(dst_data),"type: '{}' != '{}'".format(type(src_data), type(dst_data)) if isinstance(src_data,dict): assert len(src_data) == len(dst_data),"keypath:{},dict len: {} != {}".format(keypath,len(src_data), len(dst_data)) for key in src_data: keypath = key assert dst_data.has_key(key),"dst_data key not found: {}".format(keypath) cmp_dict(src_data[key],dst_data[key],keypath) elif isinstance(src_data,list): assert len(src_data) == len(dst_data),"list len: {} != {}".format(len(src_data), len(dst_data)) for idx,src_list, dst_list in zip(range(len(src_data)),src_data, dst_data): keypath = idx cmp_dict(src_list, dst_list,keypath) else: assert src_data == dst_data,"keypath: {}, value: {} != {}".format(keypath,src_data, dst_data) pass pass
#常用的map转换数据操作
#例子
data_apilist=[{"id":1,"profit":20,"income":50},{"id":2,"profit":30,"income":80}] result_apidata = map(lambda x: dict(id=x['id'],rate=round(float("{0:.2f}".format(float(x["profit"])/float(x["income"]))),2) ),data_apilist) print("result_apidata={}".format(result_apidata))
#查询mysq;数据库封装
import pymysql def query_mysql(self,sql): """查询mysql数据库 sql:查询字符串 """ conn = pymysql.connect(host='192.168.9.99', port=3306, user='test', passwd='test', db='mydb' ,charset='utf8') cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) cursor.execute(sql) rows = cursor.fetchall() conn.commit() cursor.close() conn.close() return rows
#post请求接口,返回json对象方法
def request_get_jsondata(self,posturl,postdata): """ posturl:请求接口地址 postdata:请求body参数 """ resp = requests.post(url=posturl,data=postdata,headers={'Content-Type':'application/json'}) data = json.loads(resp.content) return data