mirror of https://github.com/IoTcat/PyOne.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1001 lines
38 KiB
1001 lines
38 KiB
#-*- coding=utf-8 -*- |
|
import json |
|
import requests |
|
import collections |
|
import sys |
|
reload(sys) |
|
sys.setdefaultencoding('utf-8') |
|
if sys.version_info[0]==3: |
|
import urllib.parse as urllib |
|
else: |
|
import urllib |
|
import os |
|
import re |
|
import time |
|
import shutil |
|
import base64 |
|
import humanize |
|
import StringIO |
|
from dateutil.parser import parse |
|
from Queue import Queue |
|
from threading import Thread |
|
from redis import Redis |
|
from config import * |
|
from pymongo import MongoClient,ASCENDING,DESCENDING |
|
######mongodb |
|
client = MongoClient('localhost',27017) |
|
db=client.three |
|
items=db.items |
|
rd=Redis(host='localhost',port=6379) |
|
|
|
#######授权链接 |
|
LoginUrl=BaseAuthUrl+'/common/oauth2/v2.0/authorize?response_type=code\ |
|
&client_id={client_id}&redirect_uri={redirect_uri}&scope=offline_access%20files.readwrite.all' |
|
OAuthUrl=BaseAuthUrl+'/common/oauth2/v2.0/token' |
|
AuthData='client_id={client_id}&redirect_uri={redirect_uri}&client_secret={client_secret}&code={code}&grant_type=authorization_code' |
|
ReFreshData='client_id={client_id}&redirect_uri={redirect_uri}&client_secret={client_secret}&refresh_token={refresh_token}&grant_type=refresh_token' |
|
|
|
headers={'User-Agent':'ISV|PyOne|PyOne/2.0'} |
|
|
|
def convert2unicode(string): |
|
return string.encode('utf-8') |
|
|
|
def get_value(key,user='A'): |
|
allow_key=['client_secret','client_id'] |
|
#print('get user {}\'s {}'.format(user,key)) |
|
if key not in allow_key: |
|
return u'禁止获取' |
|
config_path=os.path.join(config_dir,'config.py') |
|
with open(config_path,'r') as f: |
|
text=f.read() |
|
kv=re.findall('"{}":{{[\w\W]*}}'.format(user),text)[0] |
|
value=re.findall('"{}":"(.*?)"'.format(key),kv)[0] |
|
return value |
|
|
|
def GetName(id): |
|
key='name:{}'.format(id) |
|
if rd.exists(key): |
|
return rd.get(key) |
|
else: |
|
item=items.find_one({'id':id}) |
|
rd.set(key,item['name']) |
|
return item['name'] |
|
|
|
def GetPath(id): |
|
key='path:{}'.format(id) |
|
if rd.exists(key): |
|
return rd.get(key) |
|
else: |
|
item=items.find_one({'id':id}) |
|
rd.set(key,item['path']) |
|
return item['path'] |
|
|
|
|
|
################################################################################ |
|
###################################授权函数##################################### |
|
################################################################################ |
|
def open_json(filepath): |
|
token=False |
|
with open(filepath,'r') as f: |
|
try: |
|
token=json.load(f) |
|
except: |
|
for i in range(1,10): |
|
try: |
|
token=json.loads(f.read()[:-i]) |
|
except: |
|
token=False |
|
if token!=False: |
|
return token |
|
return token |
|
|
|
def ReFreshToken(refresh_token,user='A'): |
|
client_id=get_value('client_id',user) |
|
client_secret=get_value('client_secret',user) |
|
headers['Content-Type']='application/x-www-form-urlencoded' |
|
data=ReFreshData.format(client_id=client_id,redirect_uri=urllib.quote(redirect_uri),client_secret=client_secret,refresh_token=refresh_token) |
|
url=OAuthUrl |
|
r=requests.post(url,data=data,headers=headers) |
|
return json.loads(r.text) |
|
|
|
|
|
def GetToken(Token_file='token.json',user='A'): |
|
Token_file='{}_{}'.format(user,Token_file) |
|
if os.path.exists(os.path.join(data_dir,Token_file)): |
|
token=open_json(os.path.join(data_dir,Token_file)) |
|
try: |
|
if time.time()>int(token.get('expires_on')): |
|
print 'token timeout' |
|
refresh_token=token.get('refresh_token') |
|
token=ReFreshToken(refresh_token,user) |
|
if token.get('access_token'): |
|
with open(os.path.join(data_dir,Token_file),'w') as f: |
|
json.dump(token,f,ensure_ascii=False) |
|
except: |
|
with open(os.path.join(data_dir,'{}_Atoken.json'.format(user)),'r') as f: |
|
Atoken=json.load(f) |
|
refresh_token=Atoken.get('refresh_token') |
|
token=ReFreshToken(refresh_token,user) |
|
token['expires_on']=str(time.time()+3599) |
|
if token.get('access_token'): |
|
with open(os.path.join(data_dir,Token_file),'w') as f: |
|
json.dump(token,f,ensure_ascii=False) |
|
return token.get('access_token') |
|
else: |
|
return False |
|
|
|
|
|
def GetAppUrl(): |
|
return 'https://graph.microsoft.com/' |
|
|
|
################################################################################ |
|
###############################onedrive操作函数################################# |
|
################################################################################ |
|
def GetExt(name): |
|
try: |
|
return name.split('.')[-1] |
|
except: |
|
return 'file' |
|
|
|
def date_to_char(date): |
|
return date.strftime('%Y/%m/%d') |
|
|
|
def Dir(path=u'A:/'): |
|
app_url=GetAppUrl() |
|
user,n_path=path.split(':') |
|
print('update {}\'s file'.format(user)) |
|
if n_path=='/': |
|
BaseUrl=app_url+u'v1.0/me/drive/root/children?expand=thumbnails' |
|
# items.remove() |
|
queue=Queue() |
|
# queue.put(dict(url=BaseUrl,grandid=grandid,parent=parent,trytime=1)) |
|
g=GetItemThread(queue,user) |
|
g.GetItem(BaseUrl) |
|
queue=g.queue |
|
if queue.qsize()==0: |
|
return |
|
tasks=[] |
|
for i in range(min(5,queue.qsize())): |
|
t=GetItemThread(queue,user) |
|
t.start() |
|
tasks.append(t) |
|
for t in tasks: |
|
t.join() |
|
RemoveRepeatFile() |
|
else: |
|
grandid=0 |
|
parent='' |
|
if n_path.endswith('/'): |
|
n_path=n_path[:-1] |
|
if not n_path.startswith('/'): |
|
n_path='/'+n_path |
|
n_path=urllib.quote(n_path) |
|
BaseUrl=app_url+u'v1.0/me/drive/root:{}:/children?expand=thumbnails'.format(n_path) |
|
queue=Queue() |
|
# queue.put(dict(url=BaseUrl,grandid=grandid,parent=parent,trytime=1)) |
|
g=GetItemThread(queue,user) |
|
g.GetItem(BaseUrl,grandid,parent,1) |
|
queue=g.queue |
|
if queue.qsize()==0: |
|
return |
|
tasks=[] |
|
for i in range(min(10,queue.qsize())): |
|
t=GetItemThread(queue,user) |
|
t.start() |
|
tasks.append(t) |
|
for t in tasks: |
|
t.join() |
|
RemoveRepeatFile() |
|
|
|
def Dir_all(path=u'A:/'): |
|
app_url=GetAppUrl() |
|
user,n_path=path.split(':') |
|
print('update {}\'s {} file'.format(user,n_path)) |
|
if n_path=='/': |
|
BaseUrl=app_url+u'v1.0/me/drive/root/children?expand=thumbnails' |
|
items.remove({'user':user}) |
|
queue=Queue() |
|
g=GetItemThread(queue,user) |
|
g.GetItem(BaseUrl) |
|
queue=g.queue |
|
if queue.qsize()==0: |
|
return |
|
tasks=[] |
|
for i in range(min(5,queue.qsize())): |
|
t=GetItemThread(queue,user) |
|
t.start() |
|
tasks.append(t) |
|
for t in tasks: |
|
t.join() |
|
RemoveRepeatFile() |
|
else: |
|
grandid=0 |
|
parent='' |
|
if n_path.endswith('/'): |
|
n_path=n_path[:-1] |
|
if not n_path.startswith('/'): |
|
n_path='/'+n_path |
|
if items.find_one({'grandid':0,'type':'folder','user':user}): |
|
parent_id=0 |
|
for idx,p in enumerate(n_path[1:].split('/')): |
|
if parent_id==0: |
|
parent_id=items.find_one({'name':p,'grandid':idx,'user':user})['id'] |
|
else: |
|
parent_id=items.find_one({'name':p,'grandid':idx,'parent':parent_id})['id'] |
|
items.delete_many({'parent':parent_id}) |
|
grandid=idx+1 |
|
parent=parent_id |
|
n_path=urllib.quote(n_path) |
|
BaseUrl=app_url+u'v1.0/me/drive/root:{}:/children?expand=thumbnails'.format(n_path) |
|
queue=Queue() |
|
g=GetItemThread(queue,user) |
|
g.GetItem(BaseUrl,grandid,parent,1) |
|
queue=g.queue |
|
if queue.qsize()==0: |
|
return |
|
tasks=[] |
|
for i in range(min(10,queue.qsize())): |
|
t=GetItemThread(queue,user) |
|
t.start() |
|
tasks.append(t) |
|
for t in tasks: |
|
t.join() |
|
RemoveRepeatFile() |
|
|
|
class GetItemThread(Thread): |
|
def __init__(self,queue,user): |
|
super(GetItemThread,self).__init__() |
|
self.queue=queue |
|
self.user=user |
|
share_path=od_users.get(user).get('share_path') |
|
if share_path=='/': |
|
self.share_path=share_path |
|
else: |
|
sp=share_path |
|
if not sp.startswith('/'): |
|
sp='/'+share_path |
|
if sp.endswith('/') and sp!='/': |
|
sp=sp[:-1] |
|
self.share_path=sp |
|
|
|
def run(self): |
|
while 1: |
|
time.sleep(0.5) #避免过快 |
|
info=self.queue.get() |
|
url=info['url'] |
|
grandid=info['grandid'] |
|
parent=info['parent'] |
|
trytime=info['trytime'] |
|
self.GetItem(url,grandid,parent,trytime) |
|
if self.queue.empty(): |
|
time.sleep(5) #再等5s |
|
print('waiting 5s if queue is not empty') |
|
if self.queue.empty(): |
|
break |
|
|
|
def GetItem(self,url,grandid=0,parent='',trytime=1): |
|
app_url=GetAppUrl() |
|
token=GetToken(user=self.user) |
|
print(u'getting files from url {}'.format(url)) |
|
header={'Authorization': 'Bearer {}'.format(token)} |
|
try: |
|
r=requests.get(url,headers=header) |
|
data=json.loads(r.content) |
|
if data.get('error'): |
|
print('error:{}! waiting 180s'.format(data.get('error').get('message'))) |
|
time.sleep(180) |
|
self.queue.put(dict(url=url,grandid=grandid,parent=parent,trytime=trytime)) |
|
return |
|
values=data.get('value') |
|
if len(values)>0: |
|
for value in values: |
|
item={} |
|
if value.get('folder'): |
|
folder=items.find_one({'id':value['id']}) |
|
if folder is not None: |
|
if folder['size_order']==value['size']: #文件夹大小未变化,不更新 |
|
print(u'path:{},origin size:{},current size:{}'.format(value['name'],folder['size_order'],value['size'])) |
|
else: |
|
items.delete_one({'id':value['id']}) |
|
item['type']='folder' |
|
item['user']=self.user |
|
item['order']=0 |
|
item['name']=convert2unicode(value['name']) |
|
item['id']=convert2unicode(value['id']) |
|
item['size']=humanize.naturalsize(value['size'], gnu=True) |
|
item['size_order']=int(value['size']) |
|
item['lastModtime']=date_to_char(parse(value['lastModifiedDateTime'])) |
|
item['grandid']=grandid |
|
item['parent']=parent |
|
grand_path=value.get('parentReference').get('path').replace('/drive/root:','') |
|
if grand_path=='': |
|
path=convert2unicode(value['name']) |
|
else: |
|
path=grand_path.replace(self.share_path,'',1)+'/'+convert2unicode(value['name']) |
|
if path.startswith('/') and path!='/': |
|
path=path[1:] |
|
if path=='': |
|
path=convert2unicode(value['name']) |
|
path='{}:/{}'.format(self.user,path) |
|
item['path']=path |
|
subfodler=items.insert_one(item) |
|
if value.get('folder').get('childCount')==0: |
|
continue |
|
else: |
|
url=app_url+'v1.0/me'+value.get('parentReference').get('path')+'/'+value.get('name')+':/children?expand=thumbnails' |
|
self.queue.put(dict(url=url,grandid=grandid+1,parent=item['id'],trytime=1)) |
|
else: |
|
if items.find_one({'id':value['id']}) is not None: #文件存在 |
|
continue |
|
else: |
|
item['type']=GetExt(value['name']) |
|
grand_path=value.get('parentReference').get('path').replace('/drive/root:','') |
|
if grand_path=='': |
|
path=convert2unicode(value['name']) |
|
else: |
|
path=grand_path.replace(self.share_path,'',1)+'/'+convert2unicode(value['name']) |
|
if path.startswith('/') and path!='/': |
|
path=path[1:] |
|
if path=='': |
|
path=convert2unicode(value['name']) |
|
path='{}:/{}'.format(self.user,path) |
|
item['path']=path |
|
item['user']=self.user |
|
item['name']=convert2unicode(value['name']) |
|
item['id']=convert2unicode(value['id']) |
|
item['size']=humanize.naturalsize(value['size'], gnu=True) |
|
item['size_order']=int(value['size']) |
|
item['lastModtime']=date_to_char(parse(value['lastModifiedDateTime'])) |
|
item['grandid']=grandid |
|
item['parent']=parent |
|
if GetExt(value['name']) in ['bmp','jpg','jpeg','png','gif']: |
|
item['order']=3 |
|
key1='name:{}'.format(value['id']) |
|
key2='path:{}'.format(value['id']) |
|
rd.set(key1,value['name']) |
|
rd.set(key2,path) |
|
elif value['name']=='.password': |
|
item['order']=1 |
|
else: |
|
item['order']=2 |
|
items.insert_one(item) |
|
if data.get('@odata.nextLink'): |
|
self.queue.put(dict(url=data.get('@odata.nextLink'),grandid=grandid,parent=parent,trytime=1)) |
|
except Exception as e: |
|
trytime+=1 |
|
print(u'error to opreate GetItem("{}","{}","{}"),try times :{}, reason: {}'.format(url,grandid,parent,trytime,e)) |
|
if trytime<=3: |
|
self.queue.put(dict(url=url,grandid=grandid,parent=parent,trytime=trytime)) |
|
|
|
|
|
def GetItemByPath(self,path): |
|
app_url=GetAppUrl() |
|
token=GetToken(user=self.user) |
|
if path=='' or path=='/': |
|
url=app_url+u'v1.0/me/drive/root/' |
|
if path=='/': |
|
url=app_url+u'v1.0/me/drive/root:{}:/'.format(path) |
|
header={'Authorization': 'Bearer {}'.format(token)} |
|
url=app_url+u'v1.0/me/drive/root:{}:/'.format(path) |
|
r=requests.get(url,headers=header) |
|
data=json.loads(r.content) |
|
return data |
|
|
|
def GetItemByUrl(self,url): |
|
app_url=GetAppUrl() |
|
token=GetToken(user=self.user) |
|
header={'Authorization': 'Bearer {}'.format(token)} |
|
r=requests.get(url,headers=header) |
|
data=json.loads(r.content) |
|
return data |
|
|
|
def GetRootid(user='A'): |
|
key='{}:rootid'.format(user) |
|
if rd.exists(key): |
|
return rd.get(key) |
|
else: |
|
app_url=GetAppUrl() |
|
token=GetToken(user=user) |
|
url=app_url+u'v1.0/me/drive/root/' |
|
header={'Authorization': 'Bearer {}'.format(token)} |
|
r=requests.get(url,headers=header) |
|
data=json.loads(r.content) |
|
rd.set(key,data['id'],3600) |
|
return data['id'] |
|
|
|
def UpdateFile(renew='all'): |
|
if renew=='all': |
|
items.remove() |
|
rd.flushdb() |
|
for user,item in od_users.items(): |
|
if item.get('client_id')!='': |
|
share_path='{}:{}'.format(user,item['share_path']) |
|
Dir_all(share_path) |
|
else: |
|
for user,item in od_users.items(): |
|
if item.get('client_id')!='': |
|
share_path='{}:{}'.format(user,item['share_path']) |
|
Dir(share_path) |
|
print('update file success!') |
|
|
|
|
|
def FileExists(filename,user='A'): |
|
token=GetToken(user=user) |
|
headers={'Authorization':'bearer {}'.format(token),'Content-Type':'application/json'} |
|
search_url=app_url+"v1.0/me/drive/root/search(q='{}')".format(filename) |
|
r=requests.get(search_url,headers=headers) |
|
jsondata=json.loads(r.text) |
|
if len(jsondata['value'])==0: |
|
return False |
|
else: |
|
return True |
|
|
|
def FileInfo(fileid,user='A'): |
|
token=GetToken(user=user) |
|
headers={'Authorization':'bearer {}'.format(token),'Content-Type':'application/json'} |
|
search_url=app_url+"v1.0/me/drive/items/{}".format(fileid) |
|
r=requests.get(search_url,headers=headers) |
|
jsondata=json.loads(r.text) |
|
return jsondata |
|
|
|
|
|
################################################上传文件 |
|
def list_all_files(rootdir): |
|
import os |
|
_files = [] |
|
if len(re.findall('[:#\|\?]+',rootdir))>0: |
|
newf=re.sub('[:#\|\?]+','',rootdir) |
|
shutil.move(rootdir,newf) |
|
rootdir=newf |
|
if rootdir.endswith(' '): |
|
shutil.move(rootdir,rootdir.rstrip()) |
|
rootdir=rootdir.rstrip() |
|
if len(re.findall('/ ',rootdir))>0: |
|
newf=re.sub('/ ','/',rootdir) |
|
shutil.move(rootdir,newf) |
|
rootdir=newf |
|
flist = os.listdir(rootdir) #列出文件夹下所有的目录与文件 |
|
for f in flist: |
|
path = os.path.join(rootdir,f) |
|
if os.path.isdir(path): |
|
_files.extend(list_all_files(path)) |
|
if os.path.isfile(path): |
|
_files.append(path) |
|
return _files |
|
|
|
def _filesize(path): |
|
size=os.path.getsize(path) |
|
# print('{}\'s size {}'.format(path,size)) |
|
return size |
|
|
|
def _file_content(path,offset,length): |
|
size=_filesize(path) |
|
offset,length=map(int,(offset,length)) |
|
if offset>size: |
|
print('offset must smaller than file size') |
|
return False |
|
length=length if offset+length<size else size-offset |
|
endpos=offset+length-1 if offset+length<size else size-1 |
|
# print("read file {} from {} to {}".format(path,offset,endpos)) |
|
with open(path,'rb') as f: |
|
f.seek(offset) |
|
content=f.read(length) |
|
return content |
|
|
|
|
|
|
|
def _upload(filepath,remote_path,user='A'): #remote_path like 'share/share.mp4' |
|
token=GetToken(user=user) |
|
headers={'Authorization':'bearer {}'.format(token)} |
|
url=app_url+'v1.0/me/drive/root:'+urllib.quote(remote_path)+':/content' |
|
r=requests.put(url,headers=headers,data=open(filepath,'rb')) |
|
data=json.loads(r.content) |
|
trytime=1 |
|
while 1: |
|
try: |
|
if data.get('error'): |
|
print(data.get('error').get('message')) |
|
yield {'status':'upload fail!'} |
|
break |
|
elif r.status_code==201 or r.status_code==200: |
|
print('upload {} success!'.format(filepath)) |
|
AddResource(data,user) |
|
yield {'status':'upload success!'} |
|
break |
|
else: |
|
print(data) |
|
yield {'status':'upload fail!'} |
|
break |
|
except Exception as e: |
|
trytime+=1 |
|
print('error to opreate _upload("{}","{}"), try times {},error:{}'.format(filepath,remote_path,trytime,e)) |
|
yield {'status':'upload fail! retry!'} |
|
if trytime>3: |
|
yield {'status':'upload fail! touch max retry time(3)'} |
|
break |
|
|
|
def _upload_part(uploadUrl, filepath, offset, length,trytime=1): |
|
size=_filesize(filepath) |
|
offset,length=map(int,(offset,length)) |
|
if offset>size: |
|
print('offset must smaller than file size') |
|
return {'status':'fail','msg':'params mistake','code':1} |
|
length=length if offset+length<size else size-offset |
|
endpos=offset+length-1 if offset+length<size else size-1 |
|
print('upload file {} {}%'.format(filepath,round(float(endpos)/size*100,1))) |
|
filebin=_file_content(filepath,offset,length) |
|
headers={} |
|
# headers['Authorization']='bearer {}'.format(token) |
|
headers['Content-Length']=str(length) |
|
headers['Content-Range']='bytes {}-{}/{}'.format(offset,endpos,size) |
|
try: |
|
r=requests.put(uploadUrl,headers=headers,data=filebin) |
|
data=json.loads(r.content) |
|
if r.status_code==201 or r.status_code==200: |
|
print(u'{} upload success!'.format(filepath)) |
|
return {'status':'success','msg':'all upload success','code':0,'info':data} |
|
elif r.status_code==202: |
|
offset=data.get('nextExpectedRanges')[0].split('-')[0] |
|
return {'status':'success','msg':'partition upload success','code':1,'offset':offset} |
|
else: |
|
trytime+=1 |
|
if trytime<=3: |
|
return {'status':'fail' |
|
,'msg':'please retry' |
|
,'sys_msg':data.get('error').get('message') |
|
,'code':2,'trytime':trytime} |
|
else: |
|
return {'status':'fail' |
|
,'msg':'retry times limit' |
|
,'sys_msg':data.get('error').get('message') |
|
,'code':3} |
|
except Exception as e: |
|
print('error to opreate _upload_part("{}","{}","{}","{}"), try times {},reason:{}'.format(uploadUrl, filepath, offset, length,trytime,e)) |
|
trytime+=1 |
|
if trytime<=3: |
|
return {'status':'fail','msg':'please retry','code':2,'trytime':trytime,'sys_msg':''} |
|
else: |
|
return {'status':'fail','msg':'retry times limit','code':3,'sys_msg':''} |
|
|
|
|
|
def _GetAllFile(parent_id="",parent_path="",filelist=[]): |
|
for f in db.items.find({'parent':parent_id}): |
|
if f['type']=='folder': |
|
_GetAllFile(f['id'],'/'.join([parent_path,f['name']]),filelist) |
|
else: |
|
fp='/'.join([parent_path,f['name']]) |
|
if fp.startswith('/'): |
|
fp=base64.b64encode(fp[1:].encode('utf-8')) |
|
else: |
|
fp=base64.b64encode(fp.encode('utf-8')) |
|
filelist.append(fp) |
|
return filelist |
|
|
|
|
|
def AddResource(data,user='A'): |
|
#检查父文件夹是否在数据库,如果不在则获取添加 |
|
grand_path=data.get('parentReference').get('path').replace('/drive/root:','') |
|
share_path=od_users.get(user).get('share_path') |
|
if grand_path=='': |
|
parent_id='' |
|
grandid=0 |
|
else: |
|
g=GetItemThread(Queue(),user) |
|
parent_id=data.get('parentReference').get('id') |
|
grandid=len(data.get('parentReference').get('path').replace('/drive/root:','').split('/'))-1 |
|
grand_path=grand_path[1:] |
|
parent_path='/' |
|
pid='' |
|
for idx,p in enumerate(grand_path.split('/')): |
|
parent=items.find_one({'name':p,'grandid':idx,'parent':pid}) |
|
if parent is not None: |
|
pid=parent['id'] |
|
parent_path='/'.join([parent_path,parent['name']]) |
|
else: |
|
parent_path='/'.join([parent_path,p]) |
|
fdata=g.GetItemByPath(parent_path) |
|
path=user+':/'+parent_path.replace('///','/') |
|
path=path.replace('///','/').replace('//','/') |
|
item={} |
|
item['type']='folder' |
|
item['user']=user |
|
item['order']=0 |
|
item['name']=fdata.get('name') |
|
item['id']=fdata.get('id') |
|
item['size']=humanize.naturalsize(fdata.get('size'), gnu=True) |
|
item['size_order']=fdata.get('size') |
|
item['lastModtime']=date_to_char(parse(fdata['lastModifiedDateTime'])) |
|
item['grandid']=idx |
|
item['parent']=pid |
|
item['path']=path |
|
items.insert_one(item) |
|
pid=fdata.get('id') |
|
#插入数据 |
|
item={} |
|
item['type']='file' |
|
item['name']=data.get('name') |
|
item['user']=user |
|
item['id']=data.get('id') |
|
item['size']=humanize.naturalsize(data.get('size'), gnu=True) |
|
item['size_order']=data.get('size') |
|
item['lastModtime']=date_to_char(parse(data.get('lastModifiedDateTime'))) |
|
item['grandid']=grandid |
|
item['parent']=parent_id |
|
if grand_path=='': |
|
path=convert2unicode(data['name']) |
|
else: |
|
path=grand_path.replace(share_path,'',1)+'/'+convert2unicode(data['name']) |
|
if path.startswith('/') and path!='/': |
|
path=path[1:] |
|
if path=='': |
|
path=convert2unicode(data['name']) |
|
item['path']=user+':/'+path |
|
if GetExt(data['name']) in ['bmp','jpg','jpeg','png','gif']: |
|
item['order']=3 |
|
key1='name:{}'.format(data['id']) |
|
key2='path:{}'.format(data['id']) |
|
rd.set(key1,data['name']) |
|
rd.set(key2,path) |
|
elif data['name']=='.password': |
|
item['order']=1 |
|
else: |
|
item['order']=2 |
|
items.insert_one(item) |
|
|
|
|
|
def CreateUploadSession(path,user='A'): |
|
token=GetToken(user=user) |
|
headers={'Authorization':'bearer {}'.format(token),'Content-Type':'application/json'} |
|
url=app_url+'v1.0/me/drive/root:'+urllib.quote(path)+':/createUploadSession' |
|
data={ |
|
"item": { |
|
"@microsoft.graph.conflictBehavior": "rename", |
|
} |
|
} |
|
try: |
|
r=requests.post(url,headers=headers,data=json.dumps(data)) |
|
retdata=json.loads(r.content) |
|
if r.status_code==409: |
|
print('file exists') |
|
return False |
|
else: |
|
return retdata |
|
except Exception as e: |
|
print('error to opreate CreateUploadSession("{}"),reason {}'.format(path,e)) |
|
return False |
|
|
|
def UploadSession(uploadUrl, filepath,user): |
|
length=327680*10 |
|
offset=0 |
|
trytime=1 |
|
filesize=_filesize(filepath) |
|
while 1: |
|
result=_upload_part(uploadUrl, filepath, offset, length,trytime=trytime) |
|
code=result['code'] |
|
#上传完成 |
|
if code==0: |
|
AddResource(result['info'],user) |
|
yield {'status':'upload success!'} |
|
break |
|
#分片上传成功 |
|
elif code==1: |
|
trytime=1 |
|
offset=result['offset'] |
|
per=round((float(offset)/filesize)*100,1) |
|
yield {'status':'partition upload success! {}%'.format(per)} |
|
#错误,重试 |
|
elif code==2: |
|
if result['sys_msg']=='The request has been throttled': |
|
print(result['sys_msg']+' ; wait for 1800s') |
|
yield {'status':'The request has been throttled! wait for 1800s'} |
|
time.sleep(1800) |
|
offset=offset |
|
trytime=result['trytime'] |
|
yield {'status':'partition upload fail! retry!'} |
|
#重试超过3次,放弃 |
|
elif code==3: |
|
yield {'status':'partition upload fail! touch max retry times!'} |
|
break |
|
|
|
|
|
|
|
def Upload_for_server(filepath,remote_path=None,user='A'): |
|
token=GetToken(user=user) |
|
headers={'Authorization':'bearer {}'.format(token),'Content-Type':'application/json'} |
|
if remote_path is None: |
|
remote_path=os.path.basename(filepath) |
|
if remote_path.endswith('/'): |
|
remote_path=os.path.join(remote_path,os.path.basename(filepath)) |
|
if not remote_path.startswith('/'): |
|
remote_path='/'+remote_path |
|
print('local file path:{}, remote file path:{}'.format(filepath,remote_path)) |
|
if _filesize(filepath)<1024*1024*3.25: |
|
for msg in _upload(filepath,remote_path,user): |
|
yield msg |
|
else: |
|
session_data=CreateUploadSession(remote_path,user) |
|
if session_data==False: |
|
yield {'status':'file exists!'} |
|
else: |
|
if session_data.get('uploadUrl'): |
|
uploadUrl=session_data.get('uploadUrl') |
|
for msg in UploadSession(uploadUrl,filepath,user): |
|
yield msg |
|
else: |
|
print(session_data.get('error').get('msg')) |
|
print('create upload session fail! {}'.format(remote_path)) |
|
yield {'status':'create upload session fail!'} |
|
|
|
def Upload(filepath,remote_path=None,user='A'): |
|
token=GetToken(user=user) |
|
headers={'Authorization':'bearer {}'.format(token),'Content-Type':'application/json'} |
|
if remote_path is None: |
|
remote_path=os.path.basename(filepath) |
|
if remote_path.endswith('/'): |
|
remote_path=os.path.join(remote_path,os.path.basename(filepath)) |
|
if not remote_path.startswith('/'): |
|
remote_path='/'+remote_path |
|
if _filesize(filepath)<1024*1024*3.25: |
|
for msg in _upload(filepath,remote_path,user): |
|
1 |
|
else: |
|
session_data=CreateUploadSession(remote_path,user) |
|
if session_data==False: |
|
return {'status':'file exists!'} |
|
else: |
|
if session_data.get('uploadUrl'): |
|
uploadUrl=session_data.get('uploadUrl') |
|
for msg in UploadSession(uploadUrl,filepath,user): |
|
1 |
|
else: |
|
print(session_data.get('error').get('msg')) |
|
print('create upload session fail! {}'.format(remote_path)) |
|
return {'status':'create upload session fail!'} |
|
|
|
|
|
class MultiUpload(Thread): |
|
def __init__(self,waiting_queue,user): |
|
super(MultiUpload,self).__init__() |
|
self.queue=waiting_queue |
|
self.user=user |
|
|
|
def run(self): |
|
while not self.queue.empty(): |
|
localpath,remote_dir=self.queue.get() |
|
Upload(localpath,remote_dir,self.user) |
|
|
|
|
|
def UploadDir(local_dir,remote_dir,user,threads=5): |
|
print(u'geting file from dir {}'.format(local_dir)) |
|
localfiles=list_all_files(local_dir) |
|
print(u'get {} files from dir {}'.format(len(localfiles),local_dir)) |
|
print(u'check filename') |
|
for f in localfiles: |
|
dir_,fname=os.path.dirname(f),os.path.basename(f) |
|
if len(re.findall('[:/#\|]+',fname))>0: |
|
newf=os.path.join(dir_,re.sub('[:/#\|]+','',fname)) |
|
shutil.move(f,newf) |
|
localfiles=list_all_files(local_dir) |
|
check_file_list=[] |
|
if local_dir.endswith('/'): |
|
local_dir=local_dir[:-1] |
|
for file in localfiles: |
|
dir_,fname=os.path.dirname(file),os.path.basename(file) |
|
remote_path=remote_dir+'/'+dir_.replace(local_dir,'')+'/'+fname |
|
remote_path=remote_path.replace('//','/') |
|
check_file_list.append((remote_path,file)) |
|
print(u'check repeat file') |
|
if remote_dir=='/': |
|
cloud_files=_GetAllFile() |
|
else: |
|
if remote_dir.startswith('/'): |
|
remote_dir=remote_dir[1:] |
|
if items.find_one({'grandid':0,'type':'folder','name':remote_dir.split('/')[0]}): |
|
parent_id=0 |
|
parent_path='' |
|
for idx,p in enumerate(remote_dir.split('/')): |
|
if parent_id==0: |
|
parent=items.find_one({'name':p,'grandid':idx}) |
|
parent_id=parent['id'] |
|
parent_path='/'.join([parent_path,parent['name']]) |
|
else: |
|
parent=items.find_one({'name':p,'grandid':idx,'parent':parent_id}) |
|
parent_id=parent['id'] |
|
parent_path='/'.join([parent_path,parent['name']]) |
|
grandid=idx+1 |
|
cloud_files=_GetAllFile(parent_id,parent_path) |
|
try: |
|
cloud_files=dict([(i,i) for i in cloud_files]) |
|
except: |
|
cloud_files={} |
|
queue=Queue() |
|
tasks=[] |
|
for remote_path,file in check_file_list: |
|
if not cloud_files.get(base64.b64encode(remote_path)): |
|
queue.put((file,remote_path)) |
|
print "check_file_list {},cloud_files {},queue {}".format(len(check_file_list),len(cloud_files),queue.qsize()) |
|
print "start upload files 5s later" |
|
time.sleep(5) |
|
for i in range(min(threads,queue.qsize())): |
|
t=MultiUpload(queue,user) |
|
t.start() |
|
tasks.append(t) |
|
for t in tasks: |
|
t.join() |
|
#删除错误数据 |
|
RemoveRepeatFile() |
|
|
|
|
|
|
|
########################删除文件 |
|
def DeleteLocalFile(fileid): |
|
items.remove({'id':fileid}) |
|
|
|
def DeleteRemoteFile(fileid,user='A'): |
|
app_url=GetAppUrl() |
|
token=GetToken(user=user) |
|
headers={'Authorization':'bearer {}'.format(token)} |
|
url=app_url+'v1.0/me/drive/items/'+fileid |
|
r=requests.delete(url,headers=headers) |
|
if r.status_code==204: |
|
DeleteLocalFile(fileid) |
|
return True |
|
else: |
|
DeleteLocalFile(fileid) |
|
return False |
|
|
|
######################## |
|
def CreateFolder(folder_name,grand_path,user='A'): |
|
app_url=GetAppUrl() |
|
token=GetToken(user=user) |
|
if grand_path=='' or grand_path is None or grand_path=='/': |
|
url=app_url+'v1.0/me/drive/root/children' |
|
parent_id='' |
|
grandid=0 |
|
else: |
|
path='{}:/{}'.format(user,grand_path) |
|
parent=items.find_one({'path':path}) |
|
parent_id=parent['id'] |
|
grandid=parent['grandid']+1 |
|
url=app_url+'v1.0/me/drive/items/{}/children'.format(parent['id']) |
|
headers={'Authorization':'bearer {}'.format(token),'Content-Type':'application/json'} |
|
payload={ |
|
"name": folder_name, |
|
"folder": {}, |
|
"@microsoft.graph.conflictBehavior": "rename" |
|
} |
|
r=requests.post(url,headers=headers,data=json.dumps(payload)) |
|
data=json.loads(r.content) |
|
if data.get('id'): |
|
#插入数据 |
|
share_path=od_users.get(user).get('share_path') |
|
item={} |
|
item['type']='folder' |
|
item['user']=user |
|
item['name']=data.get('name') |
|
item['id']=data.get('id') |
|
item['size']=humanize.naturalsize(data.get('size'), gnu=True) |
|
item['size_order']=data.get('size') |
|
item['lastModtime']=date_to_char(parse(data.get('lastModifiedDateTime'))) |
|
item['grandid']=grandid |
|
item['parent']=parent_id |
|
if grand_path=='' or grand_path is None or grand_path=='/': |
|
path=convert2unicode(data['name']) |
|
else: |
|
path=grand_path.replace(share_path,'',1)+'/'+convert2unicode(data['name']) |
|
if not path.startswith('/'): |
|
path='/'+path |
|
path='{}:{}'.format(user,path) |
|
item['path']=path |
|
item['order']=0 |
|
items.insert_one(item) |
|
return True |
|
else: |
|
print(data.get('error').get('msg')) |
|
return False |
|
|
|
def MoveFile(fileid,new_folder_path,user='A'): |
|
app_url=GetAppUrl() |
|
token=GetToken(user=user) |
|
#GetRootid |
|
if new_folder_path=='' or new_folder_path is None or new_folder_path=='/': |
|
folder_id=GetRootid(user) |
|
parent='' |
|
grandid=0 |
|
path=GetName(fileid) |
|
else: |
|
path='{}:/{}'.format(user,new_folder_path) |
|
parent_item=items.find_one({'path':path}) |
|
folder_id=parent_item['id'] |
|
parent=parent_item['id'] |
|
grandid=parent_item['grandid']+1 |
|
path=parent_item['path']+'/'+GetName(fileid) |
|
url=app_url+'v1.0/me/drive/items/{}'.format(fileid) |
|
headers={'Authorization':'bearer {}'.format(token),'Content-Type':'application/json'} |
|
payload={ |
|
"parentReference": { |
|
"id": folder_id |
|
}, |
|
"name": GetName(fileid) |
|
} |
|
r=requests.patch(url,headers=headers,data=json.dumps(payload)) |
|
data=json.loads(r.content) |
|
if data.get('id'): |
|
new_value={'parent':parent,'grandid':grandid,'path':path} |
|
items.find_one_and_update({'id':fileid},{'$set':new_value}) |
|
file=items.find_one({'id':fileid}) |
|
filename=file['name'] |
|
if file['parent']=='': |
|
path='/' |
|
else: |
|
path=items.find_one({'id':file['parent']})['path'] |
|
key='has_item$#$#$#$#{}$#$#$#$#{}'.format(path,filename) |
|
rd.delete(key) |
|
return True |
|
else: |
|
print(data.get('error').get('msg')) |
|
return False |
|
|
|
def CheckTimeOut(fileid): |
|
app_url=GetAppUrl() |
|
token=GetToken() |
|
headers={'Authorization':'bearer {}'.format(token),'Content-Type':'application/json'} |
|
url=app_url+'v1.0/me/drive/items/'+fileid |
|
r=requests.get(url,headers=headers) |
|
data=json.loads(r.content) |
|
if data.get('@microsoft.graph.downloadUrl'): |
|
downloadUrl=data.get('@microsoft.graph.downloadUrl') |
|
start_time=time.time() |
|
for i in range(10000): |
|
r=requests.head(downloadUrl) |
|
print '{}\'s gone, status:{}'.format(time.time()-start_time,r.status_code) |
|
if r.status_code==404: |
|
break |
|
|
|
def RemoveRepeatFile(): |
|
""" |
|
db.items.aggregate([ |
|
{ |
|
$group:{_id:{id:'$id'},count:{$sum:1},dups:{$addToSet:'$_id'}} |
|
}, |
|
{ |
|
$match:{count:{$gt:1}} |
|
} |
|
|
|
]).forEach(function(it){ |
|
|
|
it.dups.shift(); |
|
db.items.remove({_id: {$in: it.dups}}); |
|
|
|
}); |
|
""" |
|
deleteData=items.aggregate([ |
|
{'$group': { |
|
'_id': { 'id': "$id"}, |
|
'uniqueIds': { '$addToSet': "$_id" }, |
|
'count': { '$sum': 1 } |
|
}}, |
|
{ '$match': { |
|
'count': { '$gt': 1 } |
|
}} |
|
]); |
|
first=True |
|
try: |
|
for d in deleteData: |
|
first=True |
|
for did in d['uniqueIds']: |
|
if not first: |
|
items.delete_one({'_id':did}); |
|
first=False |
|
except Exception as e: |
|
print(e) |
|
return |
|
|
|
|
|
if __name__=='__main__': |
|
func=sys.argv[1] |
|
if len(sys.argv)>2: |
|
args=sys.argv[2:] |
|
eval(func+str(tuple(args))) |
|
else: |
|
eval(func+'()')
|
|
|