Skip to Content

django-pyas2 监听文件夹自动发送 EDI 文件

我们希望 AS2 服务器可以监听指定目录,当目录发送变化时,自动发送邮件给对应客户。这样我们的 EDI 系统生成文件后,只需要投递到对应的目录即可。

我们使用的 AS2 服务器是基于 django 的,所以我们可以像开发 django 项目一样,自行开发模块来实现扩展。

参考文档 django-pyas2 扩展

首先我们创建 app

$ python manage.py startapp extend_pyas2

此时目录结构如下

{PROJECT DIRECTORY}
└──django_pyas2
    ├── django_pyas2
    │   ├── db.sqlite3
    │   ├── manage.py
    │   └── django_pyas2
    │       ├── settings.py
    │       ├── urls.py
    │       └── wsgi.py
    └── extend_pyas2
        ├── apps.py
        ├── migrations
        ├── models.py
        ├── tests.py
        └── views.py

我们希望可以使用指令的形式,所以我们需要添加 commands 文件

指令

$ python manage.py filewatcher
│       └── wsgi.py
└── extend_pyas2
    ├── apps.py
    ├── migrations
    ├── models.py
    ├── tests.py
    ├── views.py
    └── management
        └── commands
            └── filewatcher.py

配置 app

INSTALLED_APPS = (
    ...
    'pyas2',
    'extend_pyas2',
)

filewatcher.py 代码如下

import os

import time

import logging

import socket

import sys

import atexit

from django.core.management.base import BaseCommand, CommandError

from django.core.management import call_command

from django.utils.translation import gettext as _

from pyas2.models import Organization, Partner

from pyas2 import settings

from watchdog.observers import Observer

from watchdog.observers.polling import PollingObserverVFS

from watchdog.events import PatternMatchingEventHandler


logger = logging.getLogger('django')


DAEMONPORT = 16388



class FileWatchHandle(PatternMatchingEventHandler):

"""

FileWatchHandler that ignores directories. No Patterns defined by default. Any file in the

directory will be sent.

"""


def __init__(self, tasks, dir_watch):

super(FileWatchHandle, self).__init__(ignore_directories=True)

self.tasks = tasks

self.dir_watch = dir_watch


def handle_event(self, event):

self.tasks.add(

(self.dir_watch['organization'], self.dir_watch['partner'], event.src_path))

logger.info(u' "%(file)s" created. Adding to Task Queue.', {'file': event.src_path})


def on_modified(self, event):

self.handle_event(event)


def on_created(self, event):

self.handle_event(event)



class Entry:

def __init__(self, name):

self.name = name



class WatchdogObserversManager:

"""

Creates and manages a list of watchdog observers as daemons. All daemons will have the same

settings. By default, subdirectories are not searched.

:param: force_vfs : if the underlying filesystem is a network share, OS events cannot be

used reliably. Polling to be done, which is expensive.

"""


def __init__(self, is_daemon=True, force_vfs=False):

self.observers = []

self.is_daemon = is_daemon

self.force_vfs = force_vfs


def listdir(self, root):

# 由于watchdog需要从entry提取name属性,这里将listdir结果封装为entry对象,避免报错

res = os.listdir(root)

res = [Entry(i) for i in res]


return res


def add_observer(self, tasks, dir_watch):

if self.force_vfs:

new_observer = PollingObserverVFS(stat=os.stat, listdir=self.listdir)

else:

new_observer = Observer()

new_observer.daemon = self.is_daemon

new_observer.schedule(FileWatchHandle(tasks, dir_watch),

dir_watch['path'], recursive=False)

new_observer.start()

self.observers.append(new_observer)


def stop_all(self):

for observer in self.observers:

observer.stop()


def join_all(self):

for observer in self.observers:

observer.join()



class Command(BaseCommand):

help = _(u'Daemon process that watches the outbox of all as2 partners and '

u'triggers sendmessage when files become available')


def handle(self, *args, **options):

logger.info(_(u'Starting PYAS2 send Watchdog daemon.'))

engine_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

try:

engine_socket.bind(('127.0.0.1', DAEMONPORT))

except socket.error:

engine_socket.close()

raise CommandError(_(u'An instance of the send daemon is already running'))

else:

atexit.register(engine_socket.close)


tasks = set()

dir_watch_data = []


for partner in Partner.objects.all():

for org in Organization.objects.all():

outboxDir = os.path.join(settings.DATA_DIR,

'messages',

partner.as2_name,

'outbox',

org.as2_name)

if not os.path.exists(outboxDir):

os.makedirs(outboxDir) # 创建不存在的文件夹


if os.path.isdir(outboxDir):

dir_watch_data.append({

'path': outboxDir,

'organization': org.as2_name,

'partner': partner.as2_name

})


if not dir_watch_data:

logger.error(_(u'No partners have been configured!'))

sys.exit(0)


logger.info(_(u'Process existing files in the directory.'))

for dir_watch in dir_watch_data:

files = [f for f in os.listdir(dir_watch['path']) if

os.path.isfile(os.path.join(dir_watch['path'], f))]

for file in files:

result = call_command('sendas2message', dir_watch['organization'], dir_watch['partner'],

os.path.join(dir_watch['path'], file), delete=True)

logger.info(u'Send as2 message "%(file)s" from "%(org)s" to "%(partner)s". STATE:%(result)s',

{'file': file,

'org': dir_watch['organization'],

'partner': dir_watch['partner'], 'result': result})


logger.info(_(u'PYAS2 send Watchdog daemon started.'))

active_receiving = False

watchdog_file_observers = WatchdogObserversManager(is_daemon=True, force_vfs=True)

for dir_watch in dir_watch_data:

watchdog_file_observers.add_observer(tasks, dir_watch)

try:

logger.info(_(u'Watchdog awaiting tasks...'))

while True:

if tasks:

if not active_receiving:

# first request (after tasks have been fired, or startup of dirmonitor)

active_receiving = True

else: # active receiving events

for task in tasks:

result = call_command('sendas2message', task[0], task[1],

task[2], delete=True)

logger.info(

u'Send as2 message "%(file)s" from "%(org)s" to "%(partner)s". STATE:%(result)s',

{'file': task[2],

'org': task[0],

'partner': task[1], 'result': result})


tasks.clear()

active_receiving = False

time.sleep(2)


except (Exception, KeyboardInterrupt) as msg:

logger.info(u'Error in running task: "%(msg)s".', {'msg': msg})

logger.info(u'Stopping all running Watchdog threads...')

watchdog_file_observers.stop_all()

logger.info(u'All Watchdog threads stopped.')


logger.info(u'Waiting for all Watchdog threads to finish...')

watchdog_file_observers.join_all()

logger.info(u'All Watchdog threads finished. Exiting...')

sys.exit(0)




in Odoo
自定义 systemctl 指令并自启动