M
class File(models.Model): ?# 文档模型 ???name = models.CharField(max_length=255) ???staff = models.ForeignKey('Staff') ???qiniu_name = models.CharField(max_length=255) ???remark = models.CharField(max_length=255, default='')
Form
class UploadStafffile(forms.Form): ?# 员工上传文件的名字,员工id,员工的备注 ???name = forms.CharField() ???staff = forms.IntegerField() ???remark = forms.CharField() ???data = forms.FileField()class StaffAllfile(forms.Form): ???id = forms.IntegerField()class DownloadStafffile(forms.Form): ???qiniu_name = forms.CharField
V
# -*- coding: utf-8 -*-from __future__ import unicode_literalsimport base64import datetimeimport jsonimport randomimport stringimport requestsfrom django.http import HttpResponse, HttpResponseBadRequestfrom django.shortcuts import redirectfrom qiniu import Authfrom qiniu.utils import urlsafe_base64_encodefrom basic import models, formsQINIU = {'access_key': 'SDbdxmtksVbzc6iZHh0YDX6yFbxIMegm_yqP0wqu', ????????'secret_key': 'cqhR4eMnt_-M0l2NpiBEE02RB9guJxRl_0H3000G', 'bucket_name': 'schoolsalary'}# 上传自命名文档def upload_staff_file(request): ???form = forms.UploadStafffile(request.POST) ???if not form.is_valid(): ???????e = ','.join([form.errors[i][0] for i in form.errors]) if len(form.errors) > 0 else u'未知错误' ???????return HttpResponseBadRequest(json.dumps({'code': 'false', 'msg': e, 'data': []}), ?????????????????????????????????????content_type='application/json') ???q = Auth(QINIU['access_key'], QINIU['secret_key']) ???file_name = form.cleaned_data['name'] ???staff_id = form.cleaned_data['staff'] ???remark = form.cleaned_data['remark'] ???data_file = form.cleaned_data['data'] ?# reuqest.FILES.get('data') ???if not models.Staff.objects.filter(id=staff_id).exists(): ???????return HttpResponseBadRequest(json.dumps({'code': 'false', 'msg': '上传文件的员工不存在', 'data': []}), ?????????????????????????????????????content_type='application/json') ???key = ''.join(random.sample(string.ascii_letters + string.digits, 8)) ?# 随机八位字符 ???while models.File.objects.filter(qiniu_name=key): ?# 假如随机的八位字符已经被使用 ???????key = ''.join(random.sample(string.ascii_letters + string.digits, 8)) ?# 新随机八位字符 ???token = q.upload_token(QINIU['bucket_name'], key, 3600) ???base64_file = base64.b64encode(data_file.read()) ???qiniu_upload_url = "http://up-z2.qiniu.com/putb64/%s/key/%s/mimeType/%s" % ???????????????????????(str(-1), urlsafe_base64_encode(key), urlsafe_base64_encode(file_name.split('.')[-1])) ???headers = {"Content-type": "application/octet-stream", "Authorization": "UpToken " + token} ???requests.post(qiniu_upload_url, headers=headers, data=base64_file) ???# resp = json.loads(requests.post(qiniu_upload_url, headers=headers, data=base64_file).content) ???# hash = resp['hash'] ??# hash校验 ???# filename = resp['key'] ???file_obj = models.File() ???file_obj.name = file_name ???file_obj.staff = models.Staff.objects.get(id=staff_id) ???file_obj.qiniu_name = key ???file_obj.remark = remark ???file_obj.save() ???return HttpResponse(json.dumps({'code': 'ok', 'msg': '上传文档成功', 'data': []}), content_type='application/json')# 查看某员工所有自命名文档def staff_all_file(request): ???form = forms.StaffAllfile(request.POST) ???if not form.is_valid(): ???????e = ','.join([form.errors[i][0] for i in form.errors]) if len(form.errors) > 0 else u'未知错误' ???????return HttpResponseBadRequest(json.dumps({'code': 'false', 'msg': e, 'data': []}), ?????????????????????????????????????content_type='application/json') ???staff_id = form.cleaned_data['id'] ???if not models.Staff.objects.filter(id=staff_id).exists(): ???????return HttpResponseBadRequest(json.dumps({'code': 'false', 'msg': '员工不存在', 'data': []}), ?????????????????????????????????????content_type='application/json') ???query_set = models.File.objects.get(id=staff_id).file.all() ???data_list = list() ???for i in query_set: ???????one_file_dic = dict() ???????one_file_dic['name'] = i.name ???????one_file_dic['qiniu_name'] = i.qiniu_name ???????one_file_dic['remark'] = i.remark ???????data_list.append(one_file_dic) ???json_dic = {'code': 'ok', 'data': data_list, 'msg': '查询员工所有文档成功'} ???return HttpResponse(json.dumps(json_dic), content_type='application/json')def download_staff_file(request): ???form = forms.DownloadStafffile(request.POST) ???if not form.is_valid(): ???????e = ','.join([form.errors[i][0] for i in form.errors]) if len(form.errors) > 0 else u'未知错误' ???????return HttpResponseBadRequest(json.dumps({'code': 'false', 'msg': e, 'data': []}), ?????????????????????????????????????content_type='application/json') ???q = Auth(QINIU['access_key'], QINIU['secret_key']) ???qiniu_name = form.cleaned_data['qiniu_name'] ???url = 'http://pn9re4v61.bkt.clouddn.com/{}?attname='.format(qiniu_name) ???private_url = q.private_download_url(url, expires=300) ???return redirect(private_url)
Django+七牛上传+查看+下载文件相关函数,新整理未完全测试
原文地址:https://www.cnblogs.com/bqwzx/p/10618724.html