mirror of https://github.com/IoTcat/PyOne.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
122 lines
4.8 KiB
122 lines
4.8 KiB
#-*- coding=utf-8 -*- |
|
from header import * |
|
from upload import * |
|
|
|
def CutText(msg,indent=15): |
|
if len(msg)>indent*2: |
|
skip_len=len(msg)-indent*2 |
|
new_msg=msg[:indent]+'...'+msg[skip_len:] |
|
else: |
|
new_msg=msg |
|
return new_msg |
|
|
|
def get_upload_tasks(page,per_page=50): |
|
tasks=mon_db.upload_queue.find({})\ |
|
.sort([('add_time',DESCENDING)])\ |
|
.limit(per_page).skip((page-1)*per_page) |
|
##根据gid获取列表 |
|
result=[] |
|
for task in tasks: |
|
info={} |
|
info['localpath']=CutText(task['localpath']) |
|
info['remote']=CutText(task['remote']) |
|
info['user']=task['user'] |
|
info['status']=task['status'] |
|
info['speed']=task['speed'] |
|
result.append(info) |
|
return result |
|
|
|
|
|
def get_upload_tasks_no(): |
|
taskno=mon_db.upload_queue.find({}).count() |
|
return taskno |
|
|
|
|
|
def StartUploadQueue(): |
|
waiting_tasks=mon_db.upload_queue.find({ |
|
'$and':[ |
|
{'status':{'$ne':'file exists!'}}, |
|
{'status':{'$ne':'上传成功!'}} |
|
] |
|
}) |
|
queue=Queue() |
|
for t in waiting_tasks: |
|
queue.put((t['localpath'],t['remote'],t['user'],t['id'])) |
|
tasks=[] |
|
for i in range(min(5,queue.qsize())): |
|
t=MultiUploadQueue(queue) |
|
t.start() |
|
tasks.append(t) |
|
for t in tasks: |
|
t.join() |
|
#删除错误数据 |
|
RemoveRepeatFile() |
|
|
|
|
|
class MultiUploadQueue(Thread): |
|
def __init__(self,waiting_queue): |
|
super(MultiUploadQueue,self).__init__() |
|
self.queue=waiting_queue |
|
|
|
def run(self): |
|
while not self.queue.empty(): |
|
localpath,remote_path,user,id=self.queue.get() |
|
if not os.path.exists(localpath): |
|
new_value={'status':'file exists!'} |
|
mon_db.upload_queue.update_many({'id':id},{'$set':new_value}) |
|
break |
|
cp='{}:/{}'.format(user,remote_path) |
|
if mon_db.items.find_one({'path':cp}): |
|
InfoLogger().print_r(u'{} exists!'.format(cp)) |
|
new_value={'status':'file exists!'} |
|
mon_db.upload_queue.update_many({'id':id},{'$set':new_value}) |
|
break |
|
else: |
|
_upload_session=Upload_for_server(localpath,remote_path,user) |
|
while 1: |
|
try: |
|
new_value={} |
|
data=_upload_session.next() |
|
msg=data['status'] |
|
InfoLogger().print_r('{} upload status:{}'.format(localpath,msg)) |
|
""" |
|
partition upload success |
|
The request has been throttled! |
|
partition upload fail! retry |
|
partition upload fail! |
|
file exists |
|
create upload session fail |
|
""" |
|
if 'partition upload success' in msg: |
|
new_value['status']=msg |
|
new_value['speed']=data.get('speed') |
|
elif 'The request has been throttled' in msg: |
|
new_value['status']='api受限!智能等待30分钟' |
|
elif 'partition upload fail! retry' in msg: |
|
new_value['status']='上传失败,等待重试' |
|
elif 'partition upload fail' in msg: |
|
new_value['status']='上传失败,已经超过重试次数' |
|
mon_db.upload_queue.find_one_and_update({'id':id},{'$set':new_value}) |
|
break |
|
elif 'file exists' in msg: |
|
new_value['status']='远程文件已存在' |
|
mon_db.upload_queue.find_one_and_update({'id':id},{'$set':new_value}) |
|
break |
|
elif 'create upload session fail' in msg: |
|
new_value['status']='创建实例失败!' |
|
mon_db.upload_queue.find_one_and_update({'id':id},{'$set':new_value}) |
|
break |
|
else: |
|
new_value['status']='上传成功!' |
|
new_value['speed']=data.get('speed') |
|
mon_db.upload_queue.find_one_and_update({'id':id},{'$set':new_value}) |
|
time.sleep(2) |
|
if GetConfig('delete_after_upload')=='True': |
|
os.remove(localpath) |
|
break |
|
mon_db.upload_queue.find_one_and_update({'id':id},{'$set':new_value}) |
|
except Exception as e: |
|
exstr = traceback.format_exc() |
|
ErrorLogger().print_r(exstr) |
|
break |
|
time.sleep(2)
|
|
|