#!/usr/bin/python # -*- coding:utf-8 -*- ############################################################################### # dosya : modbustcp.py # ilgili : www.emrah.com # # Open MODBUS TCP protokolu icin gerekli siniflarin tanimlandigi modul. Su an # icin sadece slave sinifi mevcut. # ############################################################################### import socket # Debug yapilacak mi? Test ortami icin 1, gercek calisma ortami icin 0 DEBUG = 1 # ----------------------------------------------------------------------------- # MODBUS slave sinifi class slave: READ_COIL = "\x01" READ_BIT_INPUT = "\x02" READ_REGISTER = "\x03" READ_WORD_INPUT = "\x04" WRITE_COIL = "\x05" WRITE_REGISTER = "\x06" # ------------------------------------------------------------------------- # Nesne olusturulurken calisacak olan metod. Ilk degerler bu bolumde set # edilebilir. def __init__(self, address='192.168.1.2', port=502, timeout=10, unitid=1): try: # IP adresi self.address = address # TCP portu self.port = int(port) # Socket baglantilari icin timeout suresi self.timeout = float(timeout) # Unit ID self.unitid = unitid # Transaction identifier self.transid0 = '\x00' self.transid1 = '\x00' # Protocol identifier self.protid0 = '\x00' self.protid1 = '\x00' except Exception, err: if DEBUG: raise else: print(str(err)) # ------------------------------------------------------------------------- # Istek metodu. Block degiskeni icinde yer alan veri, slave aygita # gonderilip cevabi alinir. Cevap islenmeden dondurulur. def request(self, block): try: # socket baglantisini olustur, istek blogunu gonder ve cevabi al. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(self.timeout) sock.connect((self.address, self.port)) sock.send(block) res = sock.recv(10000) sock.close() result = res except Exception, err: if DEBUG: raise else: print(str(err)) # socket acik kalmissa kapat try: sock.close() except: pass result = None return result # ------------------------------------------------------------------------- # Veri bloklarinin basinda yer alacak olan header bolumu. Ozel bir durum # olmadikca sabit bir deger alacak. def get_header(self): try: result = "%c%c%c%c" % \ (self.transid0, self.transid1, self.protid0, self.protid0) except Exception, err: if DEBUG: raise else: print(str(err)) result = None return result # ------------------------------------------------------------------------- # Verilen sayiyi, iki hex degerden olusan hex ciftine ceviren bolum. def num_to_hex_pair(self, num): try: # num tamsayi degilse hata var demektir. if not isinstance(num, int): raise NameError('Tamsayi beklenirken hata olustu') # num pozitif bir tamsayi degilse hata var demektir. if not (num >= 0): raise NameError('Pozitif sayi beklenirken hata olustu') hexnum = '%04x' % num # Sayi 4 haneli ifade edilemiyorsa hata var demektir. if len(hexnum) != 4: raise NameError('Sayi 4 haneli degil') hex1 = "%c" % (int(hexnum[0:2], 16)) hex2 = "%c" % (int(hexnum[2:4], 16)) result = "%s%s" % (hex1, hex2) except Exception, err: if DEBUG: raise else: print(str(err)) result = None return result # ------------------------------------------------------------------------- # Okuma blogunu olusturan bolum. Belirtilen fonksiyon koduna gore coil, # input veya register okuma blogunu olusturur. def read_block(self, func, ref, count): try: # standart header block = self.get_header() # veri alani uzunlugu block += self.num_to_hex_pair(6) # Unit ID block += '%c' % self.unitid # Fonksiyon kodu block += func # Okuma adresi block += self.num_to_hex_pair(ref) # Okunacak alan uzunlugu block += self.num_to_hex_pair(count) # Hazirlanan blogu cevap olarak dondur. result = block except Exception, err: if DEBUG: raise else: print(str(err)) result = None return result # ------------------------------------------------------------------------- # Metin olarak gelen degerleri, word listesine ceviren bolum. Metin icinde # word degerler hexadecimal olarak belirtilmis olacak. def to_word_list(self, strdata): try: wordlist = [] # Sirayla her bir word degeri al ve word listesine ekle. while len(strdata) > 0: word = ord(strdata[0])*256 + ord(strdata[1]) wordlist.append(word) # Eklenen kismi sil, stringin kalanini al. strdata = strdata[2:] result = wordlist except Exception, err: if DEBUG: raise else: print(str(err)) result = [] return result # ------------------------------------------------------------------------- # Metin olarak gelen degerleri, bit listesine ceviren bolum. def to_bit_list(self, strdata): try: bitlist = [] # Sirayla her bir byte degeri al for i in range(len(strdata)): # Byte degeri, binary stringe cevir. strbyte = "{0:08b}".format(ord(strdata[i])) # Binary string'i ters sirada diz. strbyte = strbyte[::-1] # Sirayla her bir biti, bit listesine ekle for i in range(8): bitlist.append(int(strbyte[i])) result = bitlist except Exception, err: if DEBUG: raise else: print(str(err)) result = [] return result # ------------------------------------------------------------------------- # PLC'den gelen okuma cevaplarini parse eden bolum. Gelecek verilerin word # tipinde olup olmayacagi isword degeri ile belirtilir. Word degilse bit # tipinde oldugu varsayilir. def parse_read_response(self, response, isword=False): try: # Cevap icinde body bolumunun byte olarak uzunlugu. Belirtilen # uzunluk ile body'nin uzunlugu esit degilse hata var demektir. bodylen = ord(response[5]) if bodylen != len(response[6:]): raise NameError('Cevapta, body alani uzunlugu hatali') # Unit id. unitid = ord(response[6]) # Fonksiyon kodu. Fonksiyon kodu 0x79'dan buyukse PLC'den # 'hata var' uyarisi gelmis demektir. func = ord(response[7]) if func > 0x79: raise NameError("Cevapta 'hata var' uyarisi") # Cevap icinde veri bolumunun byte olarak uzunlugu. datalen = ord(response[8]) if datalen != len(response[9:]): raise NameError('Cevapta, veri alani uzunlugu hatali') # Veri tipine gore veri listesini olustur. if isword: datalist = self.to_word_list(response[9:]) else: datalist = self.to_bit_list(response[9:]) result = (unitid, func, datalist) except Exception, err: if DEBUG: raise else: print(str(err)) result = (None, None, []) return result # ------------------------------------------------------------------------- # "ref" adresinden itibaren "count" adet rolenin degerini okuyan bolum. def read_coil(self, ref, count): try: # Role deger(ler)ini okumak icin kullanilacak blogu olustur. block = self.read_block(self.READ_COIL, ref, count) block = self.read_block('\x00', ref, count) if block is None: raise NameError('Role okuma komutu olusturulamadi') # Okuma islemini gerceklestir. Cevap None gelirse hata var # demektir. res = self.request(block) if res is None: raise NameError('PLC haberlesme hatasi') # Role okuma cevabini parse et. (unitid, func, datalist) = self.parse_read_response(res, False) # Unitid None gelirse hata var demektir. if unitid is None: raise NameError('Cevap parse edilemedi') # Unit ID, komutta kullanilanla ayni degilse hata var demektir. if unitid != self.unitid: raise NameError('Gelen cevapta Unit ID hatali') # Gelen cevap, role okuma komutu icin degilse hata var demektir. if func != ord(self.READ_COIL): raise NameError('Gelen cevap, role okuma cevabi degil') result = datalist except Exception, err: if DEBUG: raise else: print(str(err)) result = None return result # ------------------------------------------------------------------------- # "ref" adresinden itibaren "count" adet digital girisin degerini okuyan # bolum. def read_bit_input(self, ref, count): try: # Input deger(ler)ini okumak icin kullanilacak blogu olustur. block = self.read_block(self.READ_BIT_INPUT, ref, count) if block is None: raise NameError('Digital giris okuma komutu olusturulamadi') # Okuma islemini gerceklestir. Cevap None gelirse hata var # demektir. res = self.request(block) if res is None: raise NameError('PLC haberlesme hatasi') # Bit okuma cevabini parse et. (unitid, func, datalist) = self.parse_read_response(res, False) # Unitid None gelirse hata var demektir. if unitid is None: raise NameError('Cevap parse edilemedi') # Unit ID, komutta kullanilanla ayni degilse hata var demektir. if unitid != self.unitid: raise NameError('Gelen cevapta Unit ID hatali') # Gelen cevap, bit okuma komutu icin degilse hata var demektir. if func != ord(self.READ_BIT_INPUT): raise NameError('Gelen cevap, bit okuma cevabi degil') result = datalist except Exception, err: if DEBUG: raise else: print(str(err)) result = None return result # ------------------------------------------------------------------------- # "ref" adresinden itibaren "count" adet registerin degerini okuyan bolum. def read_register(self, ref, count): try: # Register deger(ler)ini okumak icin kullanilacak blogu olustur. block = self.read_block(self.READ_REGISTER, ref, count) if block is None: raise NameError('Register okuma komutu olusturulamadi') # Okuma islemini gerceklestir. Cevap None gelirse hata var # demektir. res = self.request(block) if res is None: raise NameError('PLC haberlesme hatasi') # Register okuma cevabini parse et. (unitid, func, datalist) = self.parse_read_response(res, True) # Unitid None gelirse hata var demektir. if unitid is None: raise NameError('Cevap parse edilemedi') # Unit ID, komutta kullanilanla ayni degilse hata var demektir. if unitid != self.unitid: raise NameError('Gelen cevapta Unit ID hatali') # Gelen cevap, register okuma komutu icin degilse hata var # demektir. if func != ord(self.READ_REGISTER): raise NameError('Gelen cevap, register okuma cevabi degil') result = datalist except Exception, err: if DEBUG: raise else: print(str(err)) result = None return result # ------------------------------------------------------------------------- # "ref" adresinden itibaren "count" adet analog giris degerini okuyan bolum def read_word_input(self, ref, count): try: # Analog giris deger(ler)ini okumak icin kullanilacak blogu olustur block = self.read_block(self.READ_WORD_INPUT, ref, count) if block is None: raise NameError('Analog giris okuma komutu olusturulamadi') # Okuma islemini gerceklestir. Cevap None gelirse hata var # demektir. res = self.request(block) if res is None: raise NameError('PLC haberlesme hatasi') # Analog okuma cevabini parse et. (unitid, func, datalist) = self.parse_read_response(res, True) # Unitid None gelirse hata var demektir. if unitid is None: raise NameError('Cevap parse edilemedi') # Unit ID, komutta kullanilanla ayni degilse hata var demektir. if unitid != self.unitid: raise NameError('Gelen cevapta Unit ID hatali') # Gelen cevap, register okuma komutu icin degilse hata var # demektir. if func != ord(self.READ_WORD_INPUT): raise NameError('Gelen cevap, analog okuma cevabi degil') result = datalist except Exception, err: if DEBUG: raise else: print(str(err)) result = None return result # ------------------------------------------------------------------------- # Yazma blogunu olusturan bolum. Belirtilen fonksiyon koduna gore coil veya # register yazma blogunu olusturur. def write_block(self, func, ref, value): try: # standart header block = self.get_header() # veri alani uzunlugu block += self.num_to_hex_pair(6) # Unit ID block += '%c' % self.unitid # Fonksiyon kodu block += func # Yazma adresi block += self.num_to_hex_pair(ref) # Yazilacak deger block += self.num_to_hex_pair(value) # Hazirlanan blogu cevap olarak dondur. result = block except Exception, err: if DEBUG: raise else: print(str(err)) result = None return result # ------------------------------------------------------------------------- # "ref" adresindeki roleyi set eden bolum. def set_coil(self, ref): try: # Roleyi set etmek icin kullanilacak blogu olustur. block = self.write_block(self.WRITE_COIL, ref, 0xFF00) if block is None: raise NameError('Role set etme komutu olusturulamadi') # Role set etme islemini gerceklestir. result = self.request(block) # Gonderilen komut ve gelen cevap ayni degilse hata var demektir. if block != result: raise NameError('Role set edilemedi') result = True except Exception, err: if DEBUG: raise else: print(str(err)) result = False return result # ------------------------------------------------------------------------- # "ref" adresindeki roleyi resetleyen bolum. def reset_coil(self, ref): try: # Roleyi set etmek icin kullanilacak blogu olustur. block = self.write_block(self.WRITE_COIL, ref, 0x0000) if block is None: raise NameError('Role resetleme komutu olusturulamadi') # Role resetleme islemini gerceklestir. result = self.request(block) # Gonderilen komut ve gelen cevap ayni degilse hata var demektir. if block != result: raise NameError('Role resetlenemedi') result = True except Exception, err: if DEBUG: raise else: print(str(err)) result = False return result # ------------------------------------------------------------------------- # "ref" adresindeki registera 'value' degerini yazan bolum. def write_register(self, ref, value): try: # Register yazarken kullanilacak blogu olustur. block = self.write_block(self.WRITE_REGISTER, ref, value) if block is None: raise NameError('Register yazma komutu olusturulamadi') # Register yazma islemini gerceklestir. result = self.request(block) # Gonderilen komut ve gelen cevap ayni degilse hata var demektir. if block != result: raise NameError('Register degeri yazilamadi') result = True except Exception, err: if DEBUG: raise else: print(str(err)) result = False return result # ----------------------------------------------------------------------------- if __name__ == '__main__': cimon = slave(address='192.168.1.30', unitid=1, timeout=3) for i in range(8): cimon.reset_coil(i) print(cimon.read_coil(0, 8)) cimon.set_coil(0) cimon.set_coil(3) cimon.set_coil(5) print(cimon.read_coil(0, 8)) #print(cimon.read_register(100, 1)) #cimon.write_register(100, 1234) #print(cimon.read_register(100, 1)) #print(cimon.read_word_input(20, 3))