Files
Jonas Zeunert 4309a2d185 Add Code
2024-08-16 21:57:55 +02:00

915 lines
31 KiB
Python

"""
Copyright (c) 2018-2019 Kaiyu Liu,, All rights reserved.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
Version 3 as published by the Free Software Foundation; either
or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU AFFERO GENERAL PUBLIC LICENSE
along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
"""
import os
import asyncio
from pymata_aio.constants import Constants
try:
from pyboard_core import PyBoardCore
except ImportError:
from .pyboard_core import PyBoardCore
class PyBoard(Constants):
"""
This class exposes and implements a proxy API for the pymata_core asyncio
API, If your application does not use asyncio directly, this is
the API that you should use.
"""
def __init__(self, com_port=None, arduino_wait=2, sleep_tune=0.0001, log_output=False,
ip_address=None, ip_port=2000, ip_handshake='*HELLO*'):
"""
Constructor for the PyBoard API
If log_output is set to True, a log file called 'pymata_log'
will be created in the current directory and all pymata_aio output
will be redirected to the log with no output appearing on the console.
:param arduino_wait: Amount of time to allow Arduino to finish its
reset (2 seconds for Uno, Leonardo can be 0)
:param sleep_tune: This parameter sets the amount of time PyMata core
uses to set asyncio.sleep
:param log_output: If True, pymata_aio.log is created and all
console output is redirected to this file.
:param com_port: If specified, auto port detection will not be
performed and this com port will be used.
:param ip_address: If using a WiFly module, set its address here
:param ip_port: Port to used with ip_address
:param ip_handshake: Connectivity handshake string sent by IP device
:returns: None
"""
self.log_out = log_output
self.loop = asyncio.get_event_loop()
self.sleep_tune = sleep_tune
self.core = PyBoardCore(arduino_wait, self.sleep_tune, log_output,
com_port, ip_address, ip_port, ip_handshake)
os.system('cls||clear')
self.core.start()
self.sleep(1)
def analogRead(self, pin):
"""
Retrieve the last data update for the specified analog pin.
It is intended for a polling application.
:param pin: Analog pin number (ex. A2 is specified as 2)
:returns: Last value reported for the analog pin
"""
task = asyncio.ensure_future(self.core.analog_read(pin))
value = self.loop.run_until_complete(task)
return value
def analogWrite(self, pin, value):
"""
Set the selected PWM pin to the specified value.
:param pin: PWM pin number
:param value: Set the selected pin to the specified
value. 0-0x4000 (14 bits)
:returns: No return value
"""
task = asyncio.ensure_future(self.core.analog_write(pin, value))
self.loop.run_until_complete(task)
def digitalRead(self, pin):
"""
Retrieve the last data update for the specified digital pin.
It is intended for a polling application.
:param pin: Digital pin number
:returns: Last value reported for the digital pin
"""
task = asyncio.ensure_future(self.core.digital_read(pin))
value = self.loop.run_until_complete(task)
return value
def digitalPinWrite(self, pin, value=0):
"""
Set the specified digital input pin to the provided value
:param pin: Digital pin to be set
:param value: 0 or 1
:returns: No return value
"""
task = asyncio.ensure_future(self.core.digital_pin_write(pin, value))
self.loop.run_until_complete(task)
def digitalWrite(self, pin, value=0):
"""
Set the specified digital input pin to the provided value
:param pin: Digital pin to be set
:param value: 0 or 1
:returns: No return value
"""
task = asyncio.ensure_future(self.core.digital_write(pin, value))
self.loop.run_until_complete(task)
def disableAnalogReporting(self, pin):
"""
Disables analog reporting for a single analog pin.
:param pin: Analog pin number. For example for A0, the number is 0.
:returns: No return value
"""
task = asyncio.ensure_future(self.core.disable_analog_reporting(pin))
self.loop.run_until_complete(task)
def disableDigitalReporting(self, pin):
"""
Disables digital reporting. By turning reporting off for this pin,
reporting is disabled for all 8 bits in the "port"
:param pin: Pin and all pins for this port
:returns: No return value
"""
task = asyncio.ensure_future(self.core.disable_digital_reporting(pin))
self.loop.run_until_complete(task)
def configEncoder(self, pin_a, pin_b, cb=None, cb_type=None,
hall_encoder=False):
"""
This command enables the rotary encoder (2 pin + ground) and will
enable encoder reporting.
This is a FirmataPlus feature.
Encoder data is retrieved by performing a digital_read from
pin a (encoder pin 1)
:param pin_a: Encoder pin 1.
:param pin_b: Encoder pin 2.
:param cb: callback function to report encoder changes
:param cb_type: Constants.CB_TYPE_DIRECT = direct call
or Constants.CB_TYPE_ASYNCIO = asyncio coroutine
:param hall_encoder: wheel hall_encoder - set to True to select
hall encoder support support.
:returns: No return value
"""
task = asyncio.ensure_future(self.core.encoder_config(pin_a, pin_b,
cb, cb_type,
hall_encoder))
self.loop.run_until_complete(task)
def encoderRead(self, pin):
"""
This method retrieves the latest encoder data value.
It is a FirmataPlus feature.
:param pin: Encoder Pin
:returns: encoder data value
"""
try:
task = asyncio.ensure_future(self.core.encoder_read(pin))
value = self.loop.run_until_complete(task)
return value
except RuntimeError:
self.shutdown()
def enableAnalogReporting(self, pin):
"""
Enables analog reporting for a single analog pin,
:param pin: Analog pin number. For example for A0, the number is 0.
:returns: No return value
"""
task = asyncio.ensure_future(self.core.enable_analog_reporting(pin))
self.loop.run_until_complete(task)
def enableDigitalReporting(self, pin):
"""
Enables digital reporting. By turning reporting on for all
8 bits in the "port".
This is part of Firmata's protocol specification.
:param pin: Pin and all pins for this port
:returns: No return value
"""
task = asyncio.ensure_future(self.core.enable_digital_reporting(pin))
self.loop.run_until_complete(task)
def extendedAnalog(self, pin, data):
"""
This method will send an extended-data analog write command
to the selected pin..
:param pin: 0 - 127
:param data: 0 - 0-0x4000 (14 bits)
:returns: No return value
"""
task = asyncio.ensure_future(self.core.extended_analog(pin, data))
self.loop.run_until_complete(task)
def getAnalogLatchData(self, pin):
"""
A list is returned containing the latch state for the pin, the
latched value, and the time stamp
[pin_num, latch_state, latched_value, time_stamp]
If the the latch state is LATCH_LATCHED, the table is reset
(data and timestamp set to zero)
It is intended to be used for a polling application.
:param pin: Pin number.
:returns: [latched_state, threshold_type, threshold_value,
latched_data, time_stamp]
"""
task = asyncio.ensure_future(self.core.get_analog_latch_data(pin))
l_data = self.loop.run_until_complete(task)
return l_data
def getAnalogMap(self, cb=None):
"""
This method requests and returns an analog map.
:param cb: Optional callback reference
:returns: An analog map response or None if a timeout occurs
"""
task = asyncio.ensure_future(self.core.get_analog_map())
report = self.loop.run_until_complete(task)
if cb:
cb(report)
else:
return report
def getCapabilityReport(self, raw=True, cb=None):
"""
This method retrieves the Firmata capability report
:param raw: If True, it either stores or provides the callback
with a report as list.
If False, prints a formatted report to the console
:param cb: Optional callback reference to receive a raw report
:returns: capability report
"""
task = asyncio.ensure_future(self.core.get_capability_report())
report = self.loop.run_until_complete(task)
if raw:
if cb:
cb(report)
else:
return report
else:
# noinspection PyProtectedMember
self.core._format_capability_report(report)
def getDigitalLatchData(self, pin):
"""
A list is returned containing the latch state for the pin, the
latched value, and the time stamp
[pin_num, latch_state, latched_value, time_stamp]
If the the latch state is LATCH_LATCHED, the table is reset
(data and timestamp set to zero).
It is intended for use by a polling application.
:param pin: Pin number.
:returns: [latched_state, threshold_type, threshold_value,
latched_data, time_stamp]
"""
task = asyncio.ensure_future(self.core.get_digital_latch_data(pin))
l_data = self.loop.run_until_complete(task)
return l_data
def getFirmwareVersion(self, cb=None):
"""
This method retrieves the Firmata firmware version
:param cb: Reference to a callback function
:returns:If no callback is specified, the firmware version
"""
task = asyncio.ensure_future(self.core.get_firmware_version())
version = self.loop.run_until_complete(task)
if cb:
cb(version)
else:
return version
def getProtocolVersion(self, cb=None):
"""
This method retrieves the Firmata protocol version
:param cb: Optional callback reference.
:returns:If no callback is specified, the firmware version
"""
task = asyncio.ensure_future(self.core.get_protocol_version())
version = self.loop.run_until_complete(task)
if cb:
cb(version)
else:
return version
def getPinState(self, pin, cb=None):
"""
This method retrieves a pin state report for the specified pin
:param pin: Pin of interest
:param cb: optional callback reference
:returns: pin state report
"""
task = asyncio.ensure_future(self.core.get_pin_state(pin))
report = self.loop.run_until_complete(task)
if cb:
cb(report)
else:
return report
def get_pymata_version(self):
"""
This method retrieves the firmata version number
:returns: PyMata version number.
"""
task = asyncio.ensure_future(self.core.get_pymata_version())
self.loop.run_until_complete(task)
def configI2C(self, read_delay_time=0):
"""
This method configures Arduino i2c with an optional read delay time.
:param read_delay_time: firmata i2c delay time
:returns: No return value
"""
task = asyncio.ensure_future(self.core.i2c_config(read_delay_time))
self.loop.run_until_complete(task)
def readI2CData(self, address):
"""
Retrieve result of last data read from i2c device.
i2c_read_request should be called before trying to retrieve data.
It is intended for use by a polling application.
:param address: i2c
:returns: last data read or None if no data is present.
"""
task = asyncio.ensure_future(self.core.i2c_read_data(address))
value = self.loop.run_until_complete(task)
return value
def readI2CRequest(self, address, register, number_of_bytes, read_type,
cb=None, cb_type=None):
"""
This method issues an i2c read request for a single read,continuous
read or a stop, specified by the read_type.
Because different i2c devices return data at different rates,
if a callback is not specified, the user must first call this method
and then call i2c_read_data after waiting for sufficient time for the
i2c device to respond.
Some devices require that transmission be restarted
(e.g. MMA8452Q accelerometer).
Use I2C_READ | I2C_RESTART_TX for those cases.
:param address: i2c device
:param register: i2c register number
:param number_of_bytes: number of bytes to be returned
:param read_type: Constants.I2C_READ, Constants.I2C_READ_CONTINUOUSLY
or Constants.I2C_STOP_READING.
Constants.I2C_RESTART_TX may be OR'ed when required
:param cb: optional callback reference
:param cb_type: Constants.CB_TYPE_DIRECT = direct call or
Constants.CB_TYPE_ASYNCIO = asyncio coroutine
:returns: No return value
"""
task = asyncio.ensure_future(self.core.i2c_read_request(address, register,
number_of_bytes,
read_type,
cb,
cb_type))
self.loop.run_until_complete(task)
def writeI2CRequest(self, address, args):
"""
Write data to an i2c device.
:param address: i2c device address
:param args: A variable number of bytes to be sent to the device
passed in as a list.
:returns: No return value
"""
task = asyncio.ensure_future(self.core.i2c_write_request(address, args))
self.loop.run_until_complete(task)
def keepAlive(self, period=1, margin=.3):
"""
Periodically send a keep alive message to the Arduino.
Frequency of keep alive transmission is calculated as follows:
keep_alive_sent = period - (period * margin)
:param period: Time period between keepalives.
Range is 0-10 seconds. 0 disables
the keepalive mechanism.
:param margin: Safety margin to assure keepalives
are sent before period expires. Range is 0.1 to 0.9
:returns: No return value
"""
asyncio.ensure_future(self.core.keep_alive(period, margin))
def playTone(self, pin, tone_command, frequency, duration=None):
"""
This method will call the Tone library for the selected pin.
It requires FirmataPlus to be loaded onto the arduino
If the tone command is set to TONE_TONE, then the specified
tone will be played.
Else, if the tone command is TONE_NO_TONE, then any currently
playing tone will be disabled.
:param pin: Pin number
:param tone_command: Either TONE_TONE, or TONE_NO_TONE
:param frequency: Frequency of tone
:param duration: Duration of tone in milliseconds
:returns: No return value
"""
task = asyncio.ensure_future(self.core.play_tone(pin, tone_command,
frequency, duration))
self.loop.run_until_complete(task)
def tone(self, pin, frequency, duration=None):
return playTone(self, pin, Constant.TONE_TONE, frequency, duration);
def noTone(self, pin):
return playTone(self, pin, Constant.TONE_NO_TONE, 0);
def sendReset(self):
"""
Send a Firmata reset command
:returns: No return value
"""
task = asyncio.ensure_future(self.core.send_reset())
self.loop.run_until_complete(task)
def configServo(self, pin, min_pulse=544, max_pulse=2400):
"""
This method configures the Arduino for servo operation.
:param pin: Servo control pin
:param min_pulse: Minimum pulse width
:param max_pulse: Maximum pulse width
:returns: No return value
"""
task = asyncio.ensure_future(self.core.servo_config(pin, min_pulse,
max_pulse))
self.loop.run_until_complete(task)
def setAnalogLatch(self, pin, threshold_type, threshold_value,
cb=None, cb_type=None):
"""
This method "arms" an analog pin for its data to be latched and
saved in the latching table.
If a callback method is provided, when latching criteria is achieved,
the callback function is called with latching data notification.
:param pin: Analog pin number (value following an 'A' designator,
i.e. A5 = 5
:param threshold_type: Constants.LATCH_GT | Constants.LATCH_LT |
Constants.LATCH_GTE | Constants.LATCH_LTE
:param threshold_value: numerical value - between 0 and 1023
:param cb: callback method
:param cb_type: Constants.CB_TYPE_DIRECT = direct call or
Constants.CB_TYPE_ASYNCIO = asyncio coroutine
:returns: True if successful, False if parameter data is invalid
"""
task = asyncio.ensure_future(self.core.set_analog_latch(pin, threshold_type, threshold_value, cb, cb_type))
result = self.loop.run_until_complete(task)
return result
def setDigitalLatch(self, pin, threshold_value, cb=None, cb_type=None):
"""
This method "arms" a digital pin for its data to be latched and saved
in the latching table.
If a callback method is provided, when latching criteria is achieved,
the callback function is called
with latching data notification.
:param pin: Digital pin number
:param threshold_value: 0 or 1
:param cb: callback function
:param cb_type: Constants.CB_TYPE_DIRECT = direct call or
Constants.CB_TYPE_ASYNCIO = asyncio coroutine
:returns: True if successful, False if parameter data is invalid
"""
task = asyncio.ensure_future(self.core.set_digital_latch(pin, threshold_value, cb, cb_type))
result = self.loop.run_until_complete(task)
return result
def pinMode(self, pin_number, pin_state, callback=None, cb_type=None):
"""
This method sets the pin mode for the specified pin.
:param pin_number: Arduino Pin Number
:param pin_state: INPUT/OUTPUT/ANALOG/PWM/PULLUP - for SERVO use
servo_config()
:param callback: Optional: A reference to a call back function to be
called when pin data value changes
:param cb_type: Constants.CB_TYPE_DIRECT = direct call or
Constants.CB_TYPE_ASYNCIO = asyncio coroutine
:returns: No return value
"""
task = asyncio.ensure_future(self.core.set_pin_mode(pin_number, pin_state, callback, cb_type))
self.loop.run_until_complete(task)
def setSamplingInterval(self, interval):
"""
This method sets the sampling interval for the Firmata loop method
:param interval: time in milliseconds
:returns: No return value
"""
task = asyncio.ensure_future(self.core.set_sampling_interval(interval))
self.loop.run_until_complete(task)
def delay(self, milliseconds):
"""
Perform an asyncio sleep for the time specified in milliseconds.
:param milliseconds: time in milliseconds
:returns: No return value
"""
self.sleep(milliseconds * 0.001)
def sleep(self, time):
"""
Perform an asyncio sleep for the time specified in seconds. T
his method should be used in place of time.sleep()
:param time: time in seconds
:returns: No return value
"""
try:
task = asyncio.ensure_future(self.core.sleep(time))
self.loop.run_until_complete(task)
except asyncio.CancelledError:
pass
except RuntimeError:
pass
def shutdown(self):
"""
Shutdown the application and exit
:returns: No return value
"""
task = asyncio.ensure_future(self.core.shutdown())
self.loop.run_until_complete(task)
# noinspection PyUnusedLocal
def configSonar(self, trigger_pin, echo_pin, cb=None, ping_interval=50,
max_distance=200, cb_type=None):
"""
Configure the pins,ping interval and maximum distance for an HC-SR04
type device.
Single pin configuration may be used. To do so, set both the trigger
and echo pins to the same value.
Up to a maximum of 6 SONAR devices is supported
If the maximum is exceeded a message is sent to the console and the
request is ignored.
NOTE: data is measured in centimeters
This is FirmataPlus feature.
:param trigger_pin: The pin number of for the trigger (transmitter).
:param echo_pin: The pin number for the received echo.
:param cb: optional callback function to report sonar data changes
:param ping_interval: Minimum interval between pings. Lowest number
to use is 33 ms.Max is 127
:param max_distance: Maximum distance in cm. Max is 200.
:param cb_type: direct call or asyncio yield from
:returns: No return value
"""
task = asyncio.ensure_future(self.core.sonar_config(trigger_pin,
echo_pin, cb,
ping_interval,
max_distance, cb_type))
self.loop.run_until_complete(task)
def getDataFromSonar(self, trigger_pin, waitTime = 500):
"""
Retrieve Ping (HC-SR04 type) data. The data is presented as a
dictionary.
The 'key' is the trigger pin specified in sonar_config() and the
'data' is the current measured distance (in centimeters)
for that pin. If there is no data, the value is set to None.
This is a FirmataPlus feature.
:param trigger_pin: trigger pin specified in sonar_config
:returns: active_sonar_map
"""
task = asyncio.ensure_future(self.core.ping_read(trigger_pin))
self.loop.run_until_complete(task)
if(waitTime < 200):
waitTime = 200
self.delay(waitTime)
task = asyncio.ensure_future(self.core.ping_data_retrieve(trigger_pin))
sonar_data = self.loop.run_until_complete(task)
return sonar_data
def configStepper(self, steps_per_revolution, stepper_pins):
"""
Configure stepper motor prior to operation.
This is a FirmataPlus feature.
:param steps_per_revolution: number of steps per motor revolution
:param stepper_pins: a list of control pin numbers - either 4 or 2
:returns: No return value
"""
task = asyncio.ensure_future(self.core.stepper_config(steps_per_revolution,
stepper_pins))
self.loop.run_until_complete(task)
def stepStepper(self, motor_speed, number_of_steps):
"""
Move a stepper motor for the number of steps at the specified speed
This is a FirmataPlus feature.
:param motor_speed: 21 bits of data to set motor speed
:param number_of_steps: 14 bits for number of steps & direction
positive is forward, negative is reverse
"""
task = asyncio.ensure_future(self.core.stepper_step(motor_speed,
number_of_steps))
self.loop.run_until_complete(task)
def configNeopixel(self, pin, count=12, brightness=128):
"""
Configure Neopixel strip prior to operation.
This is an extended firmata feature.
:param pin: data pin of the neopixel strip
:param count: number of pixels in strip
:param brightnessss: brigheness of the pixels
:returns: No return value
"""
task = asyncio.ensure_future(self.core.neopixel_config(pin, count, brightness))
self.loop.run_until_complete(task)
def setNeopixelColor(self, index, r, g, b):
"""
Set the color of the pixel at specified index.
This is an extended firmata feature.
:param index: position of the pixel to set the color
:param r: red value of the pixel color
:param g: green value of the pixel color
:param b: blue value of the pixel color
"""
task = asyncio.ensure_future(self.core.neopixel(index,r, g, b))
self.loop.run_until_complete(task)
def configLCD(self, address=0x3F, row=2, col=16):
"""
Configure LCD1602 using I2C.
This is an extended firmata feature.
:param address: I2C adddress of the LCD
:param row: number of rows on the LCD
:param col: number of chars on each row
:returns: No return value
"""
task = asyncio.ensure_future(self.core.lcd_config(address, col, row))
self.loop.run_until_complete(task)
def printOnLCD(self, text, row = 0, col = 0, backlight=True):
"""
Show specifiedd text on the LCD.
This is an extended firmata feature.
:param address: I2C adddress of the LCD
:param row: on which row to show the text
:param col: from which character to print the text
:returns: No return value
"""
task = asyncio.ensure_future(self.core.lcd_print(text, row, col, backlight))
self.loop.run_until_complete(task)
def clearLCD(self):
"""
Clear the LCD screen.
This is an extended firmata feature.
:returns: No return value
"""
task = asyncio.ensure_future(self.core.lcd_clear())
self.loop.run_until_complete(task)
def configDHT(self, pin):
"""
Configure a DHT device.
This is an extended firmata feature.
:param pin: data pin of the DHT11
:returns: No return value
"""
task = asyncio.ensure_future(self.core.dht_read(pin))
self.loop.run_until_complete(task)
def getDataFromDHT(self, pin, waitTime=500):
"""
Read the value from a DHT device.
This is an extended firmata feature.
:param pin: data pin of the DHT11
:returns: No return value
"""
if(waitTime < 100):
waitTime = 100
self.delay(waitTime)
task = asyncio.ensure_future(self.core.dht_data_retrieve(pin))
dhtdata = self.loop.run_until_complete(task)
return dhtdata
def pixyInit(self, max_blocks=5, cb=None, cb_type=None):
"""
Initialize Pixy and will enable Pixy block reporting.
This is a FirmataPlusRB feature.
:param cb: callback function to report Pixy blocks
:param cb_type: Constants.CB_TYPE_DIRECT = direct call or
Constants.CB_TYPE_ASYNCIO = asyncio coroutine
:param max_blocks: Maximum number of Pixy blocks to report when many
signatures are found.
:returns: No return value.
"""
task = asyncio.ensure_future(self.core.pixy_init(max_blocks, cb, cb_type))
self.loop.run_until_complete(task)
def pixyGetBlocks(self):
"""
This method retrieves the latest Pixy data value
:returns: Pixy data
"""
return self.core.pixy_blocks
def pixySetServos(self, s0, s1):
"""
Sends the setServos Pixy command.
This method sets the pan/tilt servos that are plugged into Pixy's two servo ports.
:param s0: value 0 to 1000
:param s1: value 0 to 1000
:returns: No return value.
"""
task = asyncio.ensure_future(self.core.pixy_set_servos(s0, s1))
self.loop.run_until_complete(task)
def pixySetBrightness(self, brightness):
"""
Sends the setBrightness Pixy command.
This method sets the brightness (exposure) of Pixy's camera.
:param brightness: range between 0 and 255 with 255 being the
brightest setting
:returns: No return value.
"""
task = asyncio.ensure_future(self.core.pixy_set_brightness(brightness))
self.loop.run_until_complete(task)
def pixySetLed(self, r, g, b):
"""
Sends the setLed Pixy command.
This method sets the RGB LED on front of Pixy.
:param r: red range between 0 and 255
:param g: green range between 0 and 255
:param b: blue range between 0 and 255
:returns: No return value.
"""
task = asyncio.ensure_future(self.core.pixy_set_led(r, g, b))
self.loop.run_until_complete(task)