我们希望 AS2 服务器可以监听指定目录,当目录发送变化时,自动发送邮件给对应客户。这样我们的 EDI 系统生成文件后,只需要投递到对应的目录即可。
我们使用的 AS2 服务器是基于 django 的,所以我们可以像开发 django 项目一样,自行开发模块来实现扩展。
参考文档 django-pyas2 扩展
首先我们创建 app
$ python manage.py startapp extend_pyas2
此时目录结构如下
{PROJECT DIRECTORY}
└──django_pyas2
├── django_pyas2
│ ├── db.sqlite3
│ ├── manage.py
│ └── django_pyas2
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
└── extend_pyas2
├── apps.py
├── migrations
├── models.py
├── tests.py
└── views.py
我们希望可以使用指令的形式,所以我们需要添加 commands 文件
指令
$ python manage.py filewatcher
│ └── wsgi.py
└── extend_pyas2
├── apps.py
├── migrations
├── models.py
├── tests.py
├── views.py
└── management
└── commands
└── filewatcher.py配置 app
INSTALLED_APPS = (
...
'pyas2',
'extend_pyas2',
)filewatcher.py 代码如下
import os
import time
import logging
import socket
import sys
import atexit
from django.core.management.base import BaseCommand, CommandError
from django.core.management import call_command
from django.utils.translation import gettext as _
from pyas2.models import Organization, Partner
from pyas2 import settings
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserverVFS
from watchdog.events import PatternMatchingEventHandler
logger = logging.getLogger('django')
DAEMONPORT = 16388
class FileWatchHandle(PatternMatchingEventHandler):
"""
FileWatchHandler that ignores directories. No Patterns defined by default. Any file in the
directory will be sent.
"""
def __init__(self, tasks, dir_watch):
super(FileWatchHandle, self).__init__(ignore_directories=True)
self.tasks = tasks
self.dir_watch = dir_watch
def handle_event(self, event):
self.tasks.add(
(self.dir_watch['organization'], self.dir_watch['partner'], event.src_path))
logger.info(u' "%(file)s" created. Adding to Task Queue.', {'file': event.src_path})
def on_modified(self, event):
self.handle_event(event)
def on_created(self, event):
self.handle_event(event)
class Entry:
def __init__(self, name):
self.name = name
class WatchdogObserversManager:
"""
Creates and manages a list of watchdog observers as daemons. All daemons will have the same
settings. By default, subdirectories are not searched.
:param: force_vfs : if the underlying filesystem is a network share, OS events cannot be
used reliably. Polling to be done, which is expensive.
"""
def __init__(self, is_daemon=True, force_vfs=False):
self.observers = []
self.is_daemon = is_daemon
self.force_vfs = force_vfs
def listdir(self, root):
# 由于watchdog需要从entry提取name属性,这里将listdir结果封装为entry对象,避免报错
res = os.listdir(root)
res = [Entry(i) for i in res]
return res
def add_observer(self, tasks, dir_watch):
if self.force_vfs:
new_observer = PollingObserverVFS(stat=os.stat, listdir=self.listdir)
else:
new_observer = Observer()
new_observer.daemon = self.is_daemon
new_observer.schedule(FileWatchHandle(tasks, dir_watch),
dir_watch['path'], recursive=False)
new_observer.start()
self.observers.append(new_observer)
def stop_all(self):
for observer in self.observers:
observer.stop()
def join_all(self):
for observer in self.observers:
observer.join()
class Command(BaseCommand):
help = _(u'Daemon process that watches the outbox of all as2 partners and '
u'triggers sendmessage when files become available')
def handle(self, *args, **options):
logger.info(_(u'Starting PYAS2 send Watchdog daemon.'))
engine_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
engine_socket.bind(('127.0.0.1', DAEMONPORT))
except socket.error:
engine_socket.close()
raise CommandError(_(u'An instance of the send daemon is already running'))
else:
atexit.register(engine_socket.close)
tasks = set()
dir_watch_data = []
for partner in Partner.objects.all():
for org in Organization.objects.all():
outboxDir = os.path.join(settings.DATA_DIR,
'messages',
partner.as2_name,
'outbox',
org.as2_name)
if not os.path.exists(outboxDir):
os.makedirs(outboxDir) # 创建不存在的文件夹
if os.path.isdir(outboxDir):
dir_watch_data.append({
'path': outboxDir,
'organization': org.as2_name,
'partner': partner.as2_name
})
if not dir_watch_data:
logger.error(_(u'No partners have been configured!'))
sys.exit(0)
logger.info(_(u'Process existing files in the directory.'))
for dir_watch in dir_watch_data:
files = [f for f in os.listdir(dir_watch['path']) if
os.path.isfile(os.path.join(dir_watch['path'], f))]
for file in files:
result = call_command('sendas2message', dir_watch['organization'], dir_watch['partner'],
os.path.join(dir_watch['path'], file), delete=True)
logger.info(u'Send as2 message "%(file)s" from "%(org)s" to "%(partner)s". STATE:%(result)s',
{'file': file,
'org': dir_watch['organization'],
'partner': dir_watch['partner'], 'result': result})
logger.info(_(u'PYAS2 send Watchdog daemon started.'))
active_receiving = False
watchdog_file_observers = WatchdogObserversManager(is_daemon=True, force_vfs=True)
for dir_watch in dir_watch_data:
watchdog_file_observers.add_observer(tasks, dir_watch)
try:
logger.info(_(u'Watchdog awaiting tasks...'))
while True:
if tasks:
if not active_receiving:
# first request (after tasks have been fired, or startup of dirmonitor)
active_receiving = True
else: # active receiving events
for task in tasks:
result = call_command('sendas2message', task[0], task[1],
task[2], delete=True)
logger.info(
u'Send as2 message "%(file)s" from "%(org)s" to "%(partner)s". STATE:%(result)s',
{'file': task[2],
'org': task[0],
'partner': task[1], 'result': result})
tasks.clear()
active_receiving = False
time.sleep(2)
except (Exception, KeyboardInterrupt) as msg:
logger.info(u'Error in running task: "%(msg)s".', {'msg': msg})
logger.info(u'Stopping all running Watchdog threads...')
watchdog_file_observers.stop_all()
logger.info(u'All Watchdog threads stopped.')
logger.info(u'Waiting for all Watchdog threads to finish...')
watchdog_file_observers.join_all()
logger.info(u'All Watchdog threads finished. Exiting...')
sys.exit(0)