mongoDataExportScript.py 5.6 KB


  1. # @Time:2018-10-20
  2. # @Author:chenhao
  3. # @File:mongoDataExportScript.py
  4. """
  5. 本脚本文件主要用于 MongoDB 数据导出
  6. 使用方法:
  7. 使用本脚需要安装pymongo和bson库
  8. 安装方法:pip install xxxx
  9. 本脚本接受两个参数:
  10. 1:文件存放路径(文件路径,不包括文件名)根据路径查找最新文件,取其最新的object Id,作为开始时间。
  11. 2:开始日期 (格式为:yyyymmddhhmmss)根据提供的时间作为开始时间。
  12. 例如:
  13. 方法1:mongoDataExportScript.py /data/
  14. 方法2:mongoDataExportScript.py /data/ 20181017083059
  15. """
  16. import os
  17. import time
  18. import pandas as pd
  19. import datetime
  20. import sys
  21. import pymongo
  22. from bson.objectid import ObjectId
  23. def datetime2objectId(from_datetime=None,
  24. span_days=0,
  25. span_hours=0,
  26. span_minutes=0,
  27. span_weeks=0):
  28. """
  29. 根据时间手动生成一个objectid,此id不作为存储使用
  30. 参数:
  31. from_datetime:datetime datetime时间
  32. span_days:int 偏移天(+:往前/-:往后)
  33. span_hours:int 偏移小时
  34. span_minutes:int 偏移分钟
  35. span_weeks:int 偏移周
  36. return: ObjectId
  37. """
  38. from_datetime = from_datetime + datetime.timedelta(days=span_days,
  39. hours=span_hours-8,
  40. minutes=span_minutes,
  41. weeks=span_weeks)
  42. return ObjectId.from_datetime(generation_time=from_datetime)
  43. def range_search(start_timestamp, end_timestamp,ip,Uname,passwd,DB,Table):
  44. """
  45. 读取指定时间内的MongoDB数据
  46. 参数:
  47. start_timestamp:ObjectId 开始时间
  48. end_timestamp:ObjectId 结束时间
  49. ip:str 主机地址
  50. Uname:str 用户名
  51. passwd:str 密码
  52. DBname:str 数据库名
  53. Table:str 数据表名
  54. return: DataFrame
  55. """
  56. con = pymongo.MongoClient(ip)
  57. with con:
  58. db = con[DB]
  59. tableData = db[Table]
  60. return pd.DataFrame(list(tableData.find({'_id':{'$lt':end_timestamp,'$gte':start_timestamp}})))
  61. def newFile(dir):
  62. """
  63. 从指定目录中找到最新修改的文件
  64. 参数:
  65. dir:str 目录
  66. return:文件相对路径
  67. """
  68. # 列出目录下所有的文件
  69. filelist = os.listdir (dir)
  70. # print(filelist)
  71. # 对文件修改时间进行升序排列
  72. filelist.sort (key=lambda fn: os.path.getmtime (dir + '/' + fn))
  73. # 获取文件所在目录
  74. return filelist[-1]
  75. def getLastId(filepath,filename):
  76. """获取CSV文件中的_id字段的最后一个值"""
  77. if filepath[-1] != "/":
  78. _filename = "%s%s%s"%(filepath,"/",filename)
  79. else:
  80. _filename = "%s%s" % (filepath, filename)
  81. data = pd.read_csv(_filename)
  82. return data['_id'].values[-1]
  83. def datafileSave(data,filepath):
  84. """保存数据
  85. data:str 数据
  86. filepath:str 文件路径
  87. """
  88. data.to_csv(filepath)
  89. return 0
  90. def id2time(object_id):
  91. """将mongodb的_id转化为时间
  92. return:datetime
  93. """
  94. date = time.localtime (int(object_id[:8],16))
  95. # return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timeStamp))
  96. return datetime.datetime (date[0],date[1],date[2],date[3],date[4],date[5])
  97. def id2timeString(object_id):
  98. """将mongodb的_id转化为时间
  99. return:strftime
  100. """
  101. object_id = str(object_id)
  102. date = time.localtime (int(object_id[:8],16))
  103. # return time.strftime("%Y%m%d", time.localtime(date))
  104. return time.strftime("%Y%m%d", date)
  105. def main():
  106. """主函数"""
  107. # 连接参数
  108. IP = "112.74.211.208"
  109. USER = "chenhao"
  110. PASSWD = "ch_123"
  111. DB = "userlog"
  112. TABLE = "userlog"
  113. # 程序接受参数1
  114. try:
  115. argv1 = sys.argv[1]
  116. except:
  117. # exit(1)
  118. pass
  119. # 程序接受参数2
  120. try: # 指定第二个参数,使用从第二个参数指定的时间开始取数据
  121. argv2 = sys.argv[2]
  122. _time = datetime.datetime (int(argv2[:4]),
  123. int(argv2[4:6]),
  124. int(argv2[6:8]),
  125. int(argv2[8:10]),
  126. int(argv2[10:12]),
  127. int(argv2[12:14]))
  128. except: # 没有第二个参数
  129. # 取最新文件
  130. _newFile = newFile (argv1)
  131. # 取最后一个 object_id
  132. _objectId = getLastId (argv1,_newFile)
  133. # 使用最后一个 object_id ,制作检索结束的 object_id 的时间
  134. _time = id2time(_objectId)
  135. # 从MongoDB中读取最后一个ID后一周的数据
  136. _startDate = datetime2objectId (from_datetime=_time)
  137. _endDate = datetime2objectId (from_datetime=_time,span_days=+7)
  138. MongoData = range_search(_startDate, _endDate,IP,USER,PASSWD,DB,TABLE)
  139. # MongoData = range_search (_startDate, _endDate, "127.0.0.1", "root", "root", "test", "app2")
  140. # 保存文件 文件名等于 路径+文件名+时间(yyyymmdd)
  141. fileName = "%s%s%s%s%s"%(argv1,"/","mongoData_",id2timeString(_endDate),".csv")
  142. datafileSave (MongoData, fileName)
  143. if __name__ == '__main__':
  144. pass
  145. # 测试参数
  146. # argv1 = "./dataset/"
  147. #start time
  148. # start_timestamp = object_id_from_datetime(from_datetime=datetime.datetime(2018,10,17,15,0,0))
  149. #end time
  150. # end_timestamp = object_id_from_datetime(from_datetime=datetime.datetime(2018,10,17,18,0,0))
  151. # range_search(start_timestamp, end_timestamp)
  152. main()