1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
| import easyocr
import pathlib import textwrap import mysql.connector from paddleocr import PaddleOCR, draw_ocr import sys import os import google.generativeai as genai import time from config import import_env
from random import randint
import urllib.request
from PIL import Image
import shutil
import json
import_env()
from IPython.display import display from IPython.display import Markdown
import redis
import os.path
# reader = easyocr.Reader(['en'], gpu=False)
ocr = PaddleOCR(use_angle_cls=True, lang="en", use_gpu=False )
genai.configure(api_key=os.environ.get('GOOGLE_API_KEY'))
def paddle_nlp_image(path): dst = path if os.path.exists(dst)==False: src = os.environ.get('OSS_HOST') + dst print("image src " + src) try: urllib.request.urlretrieve(src, dst) except: return False print("processing the images " + dst) result = ocr.ocr(dst, cls=True) lists = [] for idx in range(len(result)): res = result[idx] try: for line in res: # print(line) print(line[1][0]) lists.append(line[1][0]) except: print("Error") return [] return lists
def nlp_image(path): dst = path if os.path.exists(dst)==False: src = os.environ.get('OSS_HOST') + dst print("image src " + src) try: urllib.request.urlretrieve(src, dst) except ValueError: return False # shutil.copyfile(src, dst) result = reader.readtext(path, detail = 0) print(result) return result
def search_from_google_ai(path): model = genai.GenerativeModel('gemini-pro-vision') dst = path if os.path.exists(dst)==False: src = os.environ.get('OSS_HOST') + dst print("image src " + src) try: urllib.request.urlretrieve(src, dst) except: return False # shutil.copyfile(src, dst) img = Image.open(dst) try: response = model.generate_content(img) print(response.text) return response.text except ValueError: #print(response.text) # print(response.parts) return False print(response) print(response.text) print(response.parts) return response.text
db = mysql.connector.connect( host=os.environ.get('DB_HOST'), user=os.environ.get('DB_USERNAME'), password=os.environ.get('DB_PASSWORD'), database=os.environ.get('DB_DATABASE') )
r = redis.Redis(host=os.environ.get('REDIS_HOST'), port=os.environ.get('REDIS_PORT'), db=os.environ.get('REDIS_DB'), password=os.environ.get('REDIS_PASSWORD'), decode_responses=True, protocol=3) cursor = db.cursor()
cursor.execute("SELECT * from sd_goods_gallery where img_desc is null order by img_id desc limit 10000")
items = cursor.fetchall()
for item in items: print("start " + str(item[0])) # result = nlp_image(item[5]) print("images " + item[5]) result = paddle_nlp_image(item[5]) cache_key = str(item[0])+"_gallery_ai_images" ai_result = r.get(cache_key) print(ai_result) if(r.exists(cache_key)==False): ai_result = search_from_google_ai(item[5]) if(ai_result!=False): r.set(cache_key, ai_result) y = json.dumps(result) print(y) print(ai_result) if (y=='[]'): y = [] y.append(ai_result) y = json.dumps(y) if(y!='[]'): cursor.execute(u'''Update sd_goods_gallery set img_desc=%s where img_id=%s;''', (y, item[0])) db.commit()
# this needs to run only once to load the model into memory # nlp_image()
|